diff --git a/cds_migrator_kit/rdm/cli.py b/cds_migrator_kit/rdm/cli.py index cf429d4..5342850 100644 --- a/cds_migrator_kit/rdm/cli.py +++ b/cds_migrator_kit/rdm/cli.py @@ -16,6 +16,11 @@ from cds_migrator_kit.rdm.affiliations.runner import RecordAffiliationsRunner from cds_migrator_kit.rdm.affiliations.streams import AffiliationsStreamDefinition +from cds_migrator_kit.rdm.comments.runner import CommenterRunner, CommentsRunner +from cds_migrator_kit.rdm.comments.streams import ( + CommenterStreamDefinition, + CommentsStreamDefinition, +) from cds_migrator_kit.rdm.records.streams import ( # UserStreamDefinition, RecordStreamDefinition, ) @@ -242,3 +247,67 @@ def dump(slug, title, filepath): with open(filepath, "w") as fp: yaml.safe_dump(streams, fp, default_flow_style=False, sort_keys=False) + + +@migration.group() +def comments(): + """Migration CLI for comments.""" + pass + + +@comments.command() +@click.option( + "--dry-run", + is_flag=True, +) +@click.option( + "--filepath", + help="Path to the comments metadata json file.", + required=True, +) +@click.option( + "--dirpath", + help="Path to the record-wise comments directory containing attached files.", + required=True, +) +@with_appcontext +def comments_run(filepath, dirpath, dry_run=False): + """Migrate the comments for the records in `filepath`.""" + log_dir = Path(current_app.config["CDS_MIGRATOR_KIT_LOGS_PATH"]) / "comments" + runner = CommentsRunner( + stream_definition=CommentsStreamDefinition, + filepath=filepath, + dirpath=dirpath, + log_dir=log_dir, + dry_run=dry_run, + ) + runner.run() + + +@comments.command() +@click.option( + "--dry-run", + is_flag=True, +) +@click.option( + "--filepath", + help="Path to the users metadata json file.", + required=True, +) +@click.option( + "--missing-users-filepath", + help="Path to the people.csv file containing person_id for missing users.", + default=None, +) +@with_appcontext +def commenters_run(filepath, users_dir_path, dry_run=False): + """Pre-create commenters accounts.""" + log_dir = Path(current_app.config["CDS_MIGRATOR_KIT_LOGS_PATH"]) / "comments" + runner = CommenterRunner( + stream_definition=CommenterStreamDefinition, + filepath=filepath, + missing_users_dir=users_dir_path, + log_dir=log_dir, + dry_run=dry_run, + ) + runner.run() diff --git a/cds_migrator_kit/rdm/comments/extract.py b/cds_migrator_kit/rdm/comments/extract.py new file mode 100644 index 0000000..9289ad8 --- /dev/null +++ b/cds_migrator_kit/rdm/comments/extract.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-Migrator-Kit is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-Migrator-Kit comments extract module.""" + +import json +from pathlib import Path + +import click +from invenio_rdm_migrator.extract import Extract + + +class LegacyCommentsExtract(Extract): + """LegacyCommentsExtract.""" + + def __init__(self, filepath, **kwargs): + """Constructor.""" + self.filepath = Path(filepath).absolute() + + def run(self): + """Run.""" + with open(self.filepath, "r") as dump_file: + data = json.load(dump_file) + with click.progressbar( + data.items(), label="Processing comments" + ) as metadata: + for recid, comments in metadata: + yield (recid, comments) + + +class LegacyCommentersExtract(Extract): + """LegacyCommentersExtract.""" + + def __init__(self, filepath, **kwargs): + """Constructor.""" + self.filepath = Path(filepath).absolute() + + def run(self): + """Run.""" + with open(self.filepath, "r") as dump_file: + data = json.load(dump_file) + with click.progressbar(data) as metadata: + for user_data in metadata: + click.secho( + f"Processing commenters: {user_data['email']}", + fg="green", + bold=True, + ) + yield {"submitter": user_data["email"]} diff --git a/cds_migrator_kit/rdm/comments/load.py b/cds_migrator_kit/rdm/comments/load.py new file mode 100644 index 0000000..d131c17 --- /dev/null +++ b/cds_migrator_kit/rdm/comments/load.py @@ -0,0 +1,273 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-RDM migration load module.""" + +import os + +import arrow +from cds_rdm.legacy.resolver import get_pid_by_legacy_recid +from flask import current_app, url_for +from invenio_access.permissions import system_identity +from invenio_accounts.models import User +from invenio_db.uow import UnitOfWork +from invenio_rdm_migrator.load.base import Load +from invenio_rdm_records.proxies import current_rdm_records_service +from invenio_rdm_records.records.api import RDMParent +from invenio_rdm_records.requests import CommunitySubmission +from invenio_records_resources.services.uow import RecordCommitOp +from invenio_requests.customizations.event_types import CommentEventType, LogEventType +from invenio_requests.proxies import current_events_service, current_requests_service +from invenio_requests.records.api import RequestEventFormat +from invenio_requests.resolvers.registry import ResolverRegistry + +from cds_migrator_kit.errors import ManualImportRequired +from cds_migrator_kit.rdm.comments.log import CommentsLogger + +logger = CommentsLogger.get_logger() + + +class CDSCommentsLoad(Load): + """CDSCommentsLoad.""" + + LEGACY_REPLY_LINK_MAP = {} + """Map of legacy reply ids to RDM comment ids.""" + + def __init__( + self, + dirpath, + dry_run=False, + ): + """Constructor.""" + self.dirpath = dirpath # The directory path where the attached files are stored + self.dry_run = dry_run + self.all_record_versions = {} + + def get_attached_files_for_comment(self, recid, comment_id): + """Get the attached files for the comment.""" + attached_files_directory = os.path.join(self.dirpath, str(recid), str(comment_id)) + if os.path.exists(attached_files_directory): + return os.listdir(attached_files_directory) + return [] + + def get_oldest_record(self, parent_pid_value): + latest_record = current_rdm_records_service.read_latest( + identity=system_identity, id_=parent_pid_value + ) + search_result = current_rdm_records_service.scan_versions( + system_identity, latest_record["id"], + ) + for hit in search_result.hits: + self.all_record_versions[hit["versions"]["index"]] = hit + oldest_version_index = min(self.all_record_versions.keys()) + return self.all_record_versions[oldest_version_index] + + def create_event( + self, request, data, community, record, uow, parent_comment_id=None + ): + logger.info( + "Creating event for record ID<{}> request ID<{}> comment ID<{}> parent_comment ID<{}>".format( + record["id"], + request.id, + data.get("comment_id"), + "self" if not parent_comment_id else parent_comment_id, + ) + ) + + # Create comment event + comment_payload = { + "payload": { + "content": data.get("content"), + "format": RequestEventFormat.HTML.value, + } + } + + comment_status = data.get("status") + event_type = CommentEventType + if comment_status == "da": + comment_payload["payload"].update( + { + "content": "comment was deleted by the author.", + "event_type": "comment_deleted", + } + ) + event_type = LogEventType + elif comment_status == "dm": + comment_payload["payload"].update( + { + "content": "comment was deleted by the moderator.", + "event_type": "comment_deleted", + } + ) + event_type = LogEventType + + event = current_events_service.record_cls.create( + {}, request=request.model, request_id=str(request.id), type=event_type + ) + + # If the comment is attached to a record file, add a link to the file in the content + if data.get("file_relation"): + file_relation = data.get("file_relation") + file_id = file_relation.get("file_id") + version = file_relation.get("version") + record_version = self.all_record_versions.get(str(version), None) + if record_version: + record_url = url_for( + "invenio_app_rdm_records.record_detail", + pid_value=record_version["id"], + preview_file=file_id, + ) + version_link = f"
" + comment_payload["payload"]["content"] = ( + version_link + "\n" + comment_payload["payload"]["content"] + ) + + if parent_comment_id: + logger.info( + "Found parent event<{}> for reply event<{}>. Setting parent_id.".format( + event.id, + parent_comment_id, + ) + ) + # If it's a reply, 1. set parent comment id, 2. and if a nested reply, in the content add mentioned reply event's deep link + event.parent_id = str(parent_comment_id) + mentioned_event_id = self.LEGACY_REPLY_LINK_MAP.get( + data.get("reply_to_id"), None + ) + if mentioned_event_id: + logger.info( + "Adding deep link to the content for the deeply nested reply event<{}>.".format( + mentioned_event_id, + event.id, + ) + ) + deep_link = f"" + comment_payload["payload"]["content"] = ( + deep_link + "\n" + comment_payload["payload"]["content"] + ) + + # TODO: Add attached files to the event + # https://github.com/CERNDocumentServer/cds-migrator-kit/issues/381 + # For now, if attached files are found, raise ManualImportRequired error + attached_files = self.get_attached_files_for_comment( + record["id"], data.get("comment_id") + ) + if attached_files: + raise ManualImportRequired( + f"Attached files found for comment ID<{data.get('comment_id')}>." + ) + + event.update(comment_payload) + user = User.query.filter_by(email=data.get("user_email")).one_or_none() + if user: + event.created_by = ResolverRegistry.resolve_entity_proxy( + {"user": str(user.id)}, raise_=True + ) + else: + raise ManualImportRequired( + f"User not found for email: {data.get('user_email')}" + ) + created_at = arrow.get(data.get("created_at")).datetime.replace(tzinfo=None) + event.model.created = created_at + event.model.version_id = 0 + + # Since we are not using the services to create the event, we need to register the commit operation manually for indexing + uow.register(RecordCommitOp(event, indexer=current_events_service.indexer)) + + return event + + def create_accepted_community_submission_request( + self, + record, + community, + creator_user_id, + comments=None, + ): + """Create an accepted community submission request.""" + if not comments: + logger.warning( + f"No comments found for record<{record['id']}>. Skipping request creation." + ) + return None + + # Resolve entities for references + creator_ref = ResolverRegistry.reference_entity( + {"user": str(creator_user_id)}, raise_=True + ) + receiver_ref = ResolverRegistry.reference_entity( + {"community": str(community.id)}, raise_=True + ) + topic_ref = ResolverRegistry.reference_entity( + {"record": record["id"]}, raise_=True + ) + + with UnitOfWork() as uow: + request_item = current_requests_service.create( + system_identity, + data={ + "title": record["metadata"]["title"], + }, + request_type=CommunitySubmission, + receiver=receiver_ref, + creator=creator_ref, + topic=topic_ref, + uow=uow, + ) + request = request_item._record + request.status = "accepted" + created_at = arrow.get(record["created"]).datetime.replace(tzinfo=None) + request.model.created = created_at + + logger.info( + f"Created accepted community submission request<{request.id}> for record<{record['id']}>." + ) + + for comment_data in comments: + comment_event = self.create_event( + request, comment_data, community, record, uow + ) + for reply in comment_data.get("replies", []): + reply_event = self.create_event( + request, + reply, + community, + record, + uow, + parent_comment_id=comment_event.id, + ) + self.LEGACY_REPLY_LINK_MAP[reply.get("comment_id")] = reply_event.id + + # Commit at the end to rollback if any error occurs not only for the request but also for the comments + uow.commit() + + return request + + def _process_legacy_comments_for_recid(self, recid, comments): + """Process the legacy comments for the record.""" + logger.info(f"Processing legacy comments for recid: {recid}") + parent_pid = get_pid_by_legacy_recid(recid) + oldest_record = self.get_oldest_record(parent_pid.pid_value) + parent = RDMParent.pid.resolve(parent_pid.pid_value) + community = parent.communities.default + record_owner_id = parent.access.owned_by.owner_id + request = self.create_accepted_community_submission_request( + oldest_record, community, record_owner_id, comments + ) + return request + + def _load(self, entry): + """Use the services to load the entries.""" + if entry: + recid, comments = entry + try: + self._process_legacy_comments_for_recid(recid, comments) + except Exception as ex: + logger.error(ex) + + def _cleanup(self, *args, **kwargs): + """Cleanup the entries.""" + pass diff --git a/cds_migrator_kit/rdm/comments/log.py b/cds_migrator_kit/rdm/comments/log.py new file mode 100644 index 0000000..6c6b226 --- /dev/null +++ b/cds_migrator_kit/rdm/comments/log.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-Migrator-Kit is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-Migrator-Kit comments logger module.""" + +import logging + + +class CommentsLogger: + """Migrator comments logger.""" + + @classmethod + def initialize(cls, log_dir): + """Constructor.""" + formatter = logging.Formatter( + fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + logger = logging.getLogger("comments-migrator") + fh = logging.FileHandler(log_dir / "info.log") + logger.setLevel(logging.WARNING) + logger.addHandler(fh) + + # errors to file + fh = logging.FileHandler(log_dir / "error.log") + fh.setLevel(logging.ERROR) + fh.setFormatter(formatter) + logger.addHandler(fh) + + # info to stream/stdout + sh = logging.StreamHandler() + sh.setFormatter(formatter) + sh.setLevel(logging.INFO) + logger.addHandler(sh) + + @classmethod + def get_logger(cls): + """Get migration logger.""" + return logging.getLogger("comments-migrator") diff --git a/cds_migrator_kit/rdm/comments/runner.py b/cds_migrator_kit/rdm/comments/runner.py new file mode 100644 index 0000000..b6548dc --- /dev/null +++ b/cds_migrator_kit/rdm/comments/runner.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-Migrator-Kit is free software; you can redistribute it and/or modify +# it under the terms of the MIT License; see LICENSE file for more details. + +"""CDS-Migrator-Kit comments runner module.""" + +from pathlib import Path + +from invenio_rdm_migrator.streams import Stream + +from cds_migrator_kit.rdm.comments.log import CommentsLogger +from cds_migrator_kit.rdm.users.api import CDSMigrationUserAPI + + +class CommentsRunner: + """ETL streams runner.""" + + def __init__(self, stream_definition, filepath, dirpath, log_dir, dry_run): + """Constructor.""" + self.log_dir = Path(log_dir) + self.log_dir.mkdir(parents=True, exist_ok=True) + + CommentsLogger.initialize(self.log_dir) + + self.stream = Stream( + stream_definition.name, + extract=stream_definition.extract_cls(filepath), + transform=stream_definition.transform_cls(), + load=stream_definition.load_cls(dirpath=dirpath, dry_run=dry_run), + ) + + def run(self): + """Run comments ETL stream.""" + try: + self.stream.run() + except Exception as e: + CommentsLogger.get_logger().exception( + f"Stream {self.stream.name} failed.", exc_info=1 + ) + + +class CommenterRunner: + """ETL streams runner dedicated to pre-create commenters accounts.""" + + def __init__( + self, stream_definition, filepath, missing_users_dir, log_dir, dry_run + ): + """Constructor.""" + self.log_dir = Path(log_dir) + self.log_dir.mkdir(parents=True, exist_ok=True) + + CommentsLogger.initialize(self.log_dir) + + self.stream = Stream( + stream_definition.name, + extract=stream_definition.extract_cls(filepath), + transform=stream_definition.transform_cls(), + load=stream_definition.load_cls( + dry_run=dry_run, + missing_users_dir=missing_users_dir, + logger=CommentsLogger.get_logger(), + user_api_cls=CDSMigrationUserAPI, + ), + ) + + def run(self): + """Run commenters ETL stream.""" + self.stream.run() diff --git a/cds_migrator_kit/rdm/comments/streams.py b/cds_migrator_kit/rdm/comments/streams.py new file mode 100644 index 0000000..4aba4dc --- /dev/null +++ b/cds_migrator_kit/rdm/comments/streams.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-Migrator-Kit is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-Migrator-Kit comments streams module.""" +from invenio_rdm_migrator.streams import StreamDefinition +from invenio_rdm_migrator.transform import IdentityTransform + +from cds_migrator_kit.rdm.comments.extract import ( + LegacyCommentersExtract, + LegacyCommentsExtract, +) +from cds_migrator_kit.users.load import CDSSubmitterLoad + +from .load import CDSCommentsLoad + +CommentsStreamDefinition = StreamDefinition( + name="comments", + extract_cls=LegacyCommentsExtract, + transform_cls=IdentityTransform, + load_cls=CDSCommentsLoad, +) +"""ETL stream for CDS to RDM comments.""" + +CommenterStreamDefinition = StreamDefinition( + name="commenters", + extract_cls=LegacyCommentersExtract, + transform_cls=IdentityTransform, + load_cls=CDSSubmitterLoad, +) +"""ETL stream for CDS to RDM commenters.""" diff --git a/scripts/copy_comments_attached_files.py b/scripts/copy_comments_attached_files.py new file mode 100644 index 0000000..65c21d5 --- /dev/null +++ b/scripts/copy_comments_attached_files.py @@ -0,0 +1,48 @@ +""" +This script is used to copy the attached files from the old system to the migration folder in the new eos. + +1. Read the attached files from the old system +2. Copy the attached files to the migration folder in the new eos system +3. Save the attached files mapping to a json file for attaching them to the comments in the new system +""" + +# from invenio.webcomment import get_attached_files +import json +import os +import shutil + +collection = "it_meetings" +environment = "sandbox" + +source_prefix = "/opt/cdsweb/var/data/comments" +destination_prefix = "/eos/media/cds/cds-rdm/{0}/migration/{1}/comments".format( + environment, collection +) +""" +collection_name/ +|-- comments/ + |-- comments_metadata.json + |-- recid/ + |-- comment_id (or reply_comment_id)/ + |-- This is where the attached file is copied to + |-- ... +(We keep the recid folder to avoid confusion with the files folder and in case different comments contain the same file name) +(We keep the comment_id (or reply_comment_id) folder to avoid confusion with the files folder and in case different comments contain the same file name) +""" + + +def copy_comments_attached_files(comments_metadata): + for recid in comments_metadata.keys(): + # Copy the whole /comments/{recid} folder to the destination folder + shutil.copytree( + os.path.join(source_prefix, recid), os.path.join(destination_prefix, recid) + ) + print("Copied {} comments directory to {}".format(recid, destination_prefix)) + + +# Load the comments metadata to get the recids with comments and the comment IDs +comments_metadata_json_file = os.path.join(destination_prefix, "comments_metadata.json") +with open(comments_metadata_json_file, "r") as f: + comments_metadata = json.load(f) + +copy_comments_attached_files(comments_metadata) diff --git a/scripts/dump_comments_to_migrate.py b/scripts/dump_comments_to_migrate.py new file mode 100644 index 0000000..bebb482 --- /dev/null +++ b/scripts/dump_comments_to_migrate.py @@ -0,0 +1,348 @@ +""" +This script is used to get the comments from the legacy system and save them to a json file. +It also dumps the users metadata into valid_users.json and missing_users.json files. +""" + +""" +1. Find migrated records with comments using https://digital-repositories.web.cern.ch/cds/cds.cern.ch/migration/#verify-if-the-collection-has-commentsdiscussions +2. Extract and sanitize comments (query_retrieve_comments_or_remarks) +3. Map legacy user id to cdsrdm user id +4. Create comments metadata +5. Save comments metadata to json file +6. Dump the users metadata into valid_users.json and missing_users.json files. +""" + +import json +import os + +from invenio.bibcirculation_cern_ldap import get_user_info_from_ldap +from invenio.dbquery import run_sql +from invenio.search_engine import search_pattern +from invenio.webcomment_dblayer import get_comment_to_bibdoc_relations + +ENV = "dev" +BASE_OUTPUT_DIR = f"/eos/media/cds/cds-rdm/{ENV}/migration/" +COMMENTS_METADATA_FILEPATH = os.path.join( + BASE_OUTPUT_DIR, "comments", "comments_metadata.json" +) + +collection_queries = [ + "980__a:CNLISSUE -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980__a:CNLARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY", + "710__5:CN OR 710__5:DD OR 710__5:IT OR 710__5:AS OR 710__5:STS 980:ARTICLE -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL", + "710__5:CN OR 710__5:DD OR 710__5:IT OR 710__5:AS OR 710__5:STS 980:PREPRINT -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL", + "980:INTNOTEITPUBL or 980:INTNOTEASPUBL OR 980:INTNOTEMIPUBL AND 690C:INTNOTE -980:DELETED -980:HIDDEN -980__a:DUMMY -980:INTNOTECMSPUBL", + "710__5:IT 980:PUBLARDA -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:REPORT AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:PERI AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:BROCHURE AND 690C:CERNITBROCHURE -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:POSTER AND 710__5:IT -980:DELETED -980:HIDDEN -980__a:DUMMY", + "980:ITCERNTALK -980:DELETED -980:HIDDEN -980__a:DUMMY", + '980:PERI AND 650:"Computing and Computers" -980:DELETED -980:HIDDEN -980__a:DUMMY', + "980:ITUDSPUBSOURCEARCHIVE -980:DELETED -980:HIDDEN -980__a:DUMMY", +] +""" +For thesis, change the query and re-run this script: "(980__:THESIS OR 980__:Thesis OR 980__:thesis) -980__:DUMMY -980__c:HIDDEN" +""" +recids_list = [] +print("Querying already migrated records...") +for i, collection_query in enumerate(collection_queries): + recs = search_pattern(p=collection_query) + print( + "({} / {}) Found {} records for query `{}`".format( + i + 1, len(collection_queries), len(recs), collection_query + ) + ) + recids_list.extend(recs) + +print("Filtering records with comments...") +records_with_comments = run_sql( + "SELECT id_bibrec " + "FROM cmtRECORDCOMMENT " + "WHERE id_bibrec IN ({}) " + "GROUP BY id_bibrec " + "HAVING COUNT(*) > 0".format(",".join([str(recid) for recid in recids_list])), +) + +recids_with_comments = [record[0] for record in records_with_comments] +print( + "Found {} records with comments out of {} migrated records".format( + len(recids_with_comments), len(recids_list) + ) +) + + +def get_comments_from_legacy(recid, display_order="cmt.date_creation ASC"): + """ + # from invenio.webcomment import query_retrieve_comments_or_remarks + """ + query = """SELECT user.nickname, + user.email, + user.note, + user.last_login, + cmt.id_user, + DATE_FORMAT(cmt.date_creation, '%%%%Y-%%%%m-%%%%d %%%%H:%%%%i:%%%%s') as created, + cmt.body, + cmt.status, + cmt.nb_abuse_reports, + cmt.id, + cmt.round_name, + cmt.restriction, + %(reply_to_column)s as reply_to_id, + cmt.body_format + FROM cmtRECORDCOMMENT cmt LEFT JOIN user ON + user.id=cmt.id_user + WHERE cmt.id_bibrec=%%s + ORDER BY %(display_order)s + """ % { + "display_order": display_order, + "reply_to_column": recid > 0 + and "cmt.in_reply_to_id_cmtRECORDCOMMENT" + or "cmt.in_reply_to_id_bskRECORDCOMMENT", + } + params = (recid,) + res = run_sql(query, params, with_dict=True) + return res + + +# Function to flatten arbitrarily nested comment replies into a 1-level replies list +def flatten_replies(comments_list): + """ + Takes a list of comments (dicts with at least id, reply_to_id), and + returns a list of top-level comments each with direct and indirect (flattened) + replies under a single 'replies' key (1-level nesting). + Each comment dict may have its own 'replies' key (list). + """ + # Build maps for quick lookup + comments_by_id = {c["id"]: dict(c, replies=[]) for c in comments_list} + top_level_comments = [] + + for c in comments_list: + parent = c.get("reply_to_id") + if parent is None or parent not in comments_by_id: + # This is a top-level comment + top_level_comments.append(comments_by_id[c["id"]]) + else: + # This is a reply; add to parent's replies + comments_by_id[parent]["replies"].append(comments_by_id[c["id"]]) + + def collect_all_replies(comment): + """Recursively flattens all indirect replies under this comment.""" + flat = [] + queue = list(comment["replies"]) + while queue: + reply = queue.pop(0) + flat.append(reply) + # Add all replies to the end of the queue + queue.extend(reply["replies"]) + # Remove their nested replies again for full flatten + for r in flat: + r["replies"] = [] + # Sort the replies by creation date + flat.sort(key=lambda x: x["created"]) + comment["replies"] = flat + + for comment in top_level_comments: + collect_all_replies(comment) + + return top_level_comments + + +comments_metadata = {} +""" +{ + recid: [ + { + comment_id: comment_id, + content: content, + status: status, + user_email: user_email, + created_at: created_at, + file_relation: {file_id: file_id, version: version}, + replies: [ + { + comment_id: reply_comment_id, + content: reply_comment_content, + status: reply_comment_status, + user_email: reply_comment_user_email, + created_at: reply_comment_created_at, + file_relation: {file_id: file_id, version: version}, + reply_to_id: id_of_the_comment_replied_to, + } + ] + } + ] +} +""" + +users_metadata = [] +""" +[(user_id, user_email, user_nickname, user_note, user_last_login), ...] +""" + +for i, recid in enumerate(recids_with_comments): + print( + "({}/{}) Processing comments for record<{}>".format( + i + 1, len(recids_with_comments), recid + ) + ) + comments = get_comments_from_legacy(recid) + + if not comments: + print("No comments found for record<{}>. Skipping...".format(recid)) + continue + + # Check if the comments list have atleast 1 comment with 'ok' status or 'approved' status to avoid migrating records with spam comments that are already deleted + # For eg.: https://cds.cern.ch/record/1367848/comments?ln=en + if not any(comment["status"] in ["ok", "ap"] for comment in comments): + print( + "No comments with atleast one 'ok'/'ap' status found for record<{}>. Skipping...".format( + recid + ) + ) + continue + + print("Found `{}` comment(s) for record<{}>".format(len(comments), recid)) + comments_metadata[recid] = [] + + # `get_comment_to_bibdoc_relations` is used to find if comments are attached to the record's files (and the version of the files) + # This is not the same as the files attached to the comments + comments_to_file_relations = get_comment_to_bibdoc_relations(recid) + comment_to_version_relations = {} + for relation in comments_to_file_relations: + comment_to_version_relations[relation["id_comment"]] = { + "file_id": relation["id_bibdoc"], + "version": relation["version"], + } + print( + "Found {} comments to file relations for record {} ...".format( + len(comments_to_file_relations), recid + ) + ) + + for comment in comments: + users_metadata.append( + ( + comment["id_user"], + comment["email"], + comment["nickname"], + comment["note"], + comment["last_login"], + ), + ) + + # Flatten the reply comments + flattened_comments = flatten_replies(comments) + # Sanitize the comment metadata for RDM + for comment in flattened_comments: + comment_data = { + "comment_id": comment["id"], + "content": comment["body"] + .replace("\xc2\xa0", " ") + .replace("\n", "") + .strip(), + "status": comment.get("status"), + "user_email": comment.get("email"), + "created_at": comment.get("created"), + "file_relation": comment_to_version_relations.get(comment["id"], {}), + "replies": [ + { + "comment_id": reply["id"], + "content": reply["body"] + .replace("\xc2\xa0", " ") + .replace("\n", "") + .strip(), + "status": reply.get("status"), + "user_email": reply.get("email"), + "created_at": reply.get("created"), + "reply_to_id": reply.get("reply_to_id"), + "file_relation": comment_to_version_relations.get(reply["id"], {}), + } + for reply in comment["replies"] + ], + } + comments_metadata[recid].append(comment_data) + print("Successfully processed comment(s) for record<{}>!!!".format(recid)) + +with open(COMMENTS_METADATA_FILEPATH, "w") as f: + json.dump(comments_metadata, f) +""" +This file will be read and run by the CommentsRunner to migrate the comments. +""" + +""" +The following snippet is taken from the `dump_users.py` script in the `production_scripts` repository: +https://gitlab.cern.ch/cds-team/production_scripts/-/blob/master/cds-rdm/migration/dump_users.py?ref_type=heads + +It is used to dump the users metadata as valid_users.json and missing_users.json files. + +After running this script, place the "active_users.json" and "missing_users.json" files in the "cds_migrator_kit/rdm/data/users/" folder along with "people.csv" file. +""" + +OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "users") +USERS_FILEPATH = os.path.join(OUTPUT_DIR, "active_users.json") +MISSING_USERS_FILEPATH = os.path.join(OUTPUT_DIR, "missing_users.json") + +if not os.path.exists(OUTPUT_DIR): + os.makedirs(OUTPUT_DIR) + + +def dump_users(): + """ + Dump the users metadata into valid_users.json and missing_users.json files. + """ + + def get_uid_from_ldap_user(ldap_user): + try: + uidNumber = ldap_user["uidNumber"][0] + return uidNumber + except: + return None + + def get_department_from_ldap_user(ldap_user): + try: + department = ldap_user["department"][0] + return department + except: + return None + + def _dump(recs): + valid_users = [] + missing_users = [] + + for rec in recs: + # record example: ( + # 414320 + # joe.doe@mail.com + # ) + + email = rec[1] + ldap_user = get_user_info_from_ldap(email=email) + uidNumber = get_uid_from_ldap_user(ldap_user) + record = { + "id": rec[0], + "email": rec[1], + "displayname": rec[2], + "active": rec[3], + } + if uidNumber: + record["uid"] = uidNumber + department = get_department_from_ldap_user(ldap_user) + if department: + record["department"] = department + else: + print("No department for {}".format(email)) + valid_users.append(record) + else: + missing_users.append(record) + + return valid_users, missing_users + + valid_users, missing_users = _dump(users_metadata) + with open(USERS_FILEPATH, "w") as fp: + json.dump(valid_users, fp, indent=2) + + if missing_users: + print("Missing users found {0}".format(len(missing_users))) + with open(MISSING_USERS_FILEPATH, "w") as fp: + json.dump(missing_users, fp, indent=2) diff --git a/tests/cds-rdm/data/comments/comments_metadata.json b/tests/cds-rdm/data/comments/comments_metadata.json new file mode 100644 index 0000000..c055b65 --- /dev/null +++ b/tests/cds-rdm/data/comments/comments_metadata.json @@ -0,0 +1,23 @@ +{ + "12345": [ + { + "comment_id": 1, + "content": "This is a test comment", + "status": "ok", + "user_email": "submitter13@cern.ch", + "created_at": "2026-01-10 10:00:00", + "file_relation": {}, + "replies": [ + { + "comment_id": 2, + "content": "This is a reply", + "status": "ok", + "user_email": "submitter10@gmail.com", + "created_at": "2026-02-10 11:00:00", + "reply_to_id": 1, + "file_relation": {} + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/cds-rdm/test_comments_migration.py b/tests/cds-rdm/test_comments_migration.py new file mode 100644 index 0000000..6e2c815 --- /dev/null +++ b/tests/cds-rdm/test_comments_migration.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""Tests for comments migration workflow.""" + +import json +import os +import tempfile + +import pytest +from invenio_access.permissions import system_identity +from invenio_accounts.models import User +from invenio_db.uow import UnitOfWork +from invenio_rdm_records.proxies import current_rdm_records_service +from invenio_rdm_records.records.api import RDMParent +from invenio_requests.proxies import current_events_service, current_requests_service + +from cds_migrator_kit.rdm.comments.runner import CommenterRunner, CommentsRunner +from cds_migrator_kit.rdm.comments.streams import ( + CommenterStreamDefinition, + CommentsStreamDefinition, +) + +LEGACY_RECD_ID = "12345" + + +@pytest.fixture +def temp_dir(): + """Create a temporary directory for test files.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield tmpdir + + +@pytest.fixture +def migrated_record_with_comments(test_app, community, uploader, db, add_pid): + """Create a migrated RDM record that will have comments and a legacy PID.""" + minimal_record = { + "metadata": { + "title": "Test Record with Comments", + "publication_date": "2026-01-01", + "resource_type": {"id": "publication-article"}, + "creators": [ + { + "person_or_org": { + "type": "personal", + "name": "Test Author", + "given_name": "Test", + "family_name": "Author", + } + } + ], + }, + "access": { + "record": "public", + "files": "public", + }, + "files": { + "enabled": False, + }, + "media_files": { + "enabled": False, + }, + } + + # Create the draft + draft = current_rdm_records_service.create( + system_identity, + minimal_record, + ) + + # Publish the record + record = current_rdm_records_service.publish(system_identity, draft.id) + + # Add to community + parent = record._record.parent + parent.communities.add(community) + parent.communities.default = community + parent.commit() + + # Add 'lrecid' legacy PID to the published record + add_pid( + pid_type="lrecid", + pid_value=LEGACY_RECD_ID, + object_uuid=parent.pid.object_uuid, + ) + + current_rdm_records_service.record_cls.index.refresh() + + return record + + +def test_create_users_from_metadata( + temp_dir, + db, +): + """Test creating users from users_metadata.json.""" + # Run commenters runner + log_dir = os.path.join(temp_dir, "logs") + runner = CommenterRunner( + stream_definition=CommenterStreamDefinition, + filepath=os.path.join( + os.path.dirname(__file__), "data", "users", "missing_users.json" + ), + missing_users_dir=os.path.join(os.path.dirname(__file__), "data", "users"), + log_dir=log_dir, + dry_run=False, + ) + runner.run() + + # Verify users were created + user1 = User.query.filter_by(email="submitter13@cern.ch").one_or_none() + user2 = User.query.filter_by(email="submitter10@gmail.com").one_or_none() + + assert user1 is not None + assert user2 is not None + + +def test_create_users_dry_run( + temp_dir, + db, +): + """Test creating users in dry-run mode.""" + # Run commenters runner in dry-run mode + log_dir = os.path.join(temp_dir, "logs") + runner = CommenterRunner( + stream_definition=CommenterStreamDefinition, + filepath=os.path.join( + os.path.dirname(__file__), "data", "users", "missing_users.json" + ), + missing_users_dir=os.path.join(os.path.dirname(__file__), "data", "users"), + log_dir=log_dir, + dry_run=True, + ) + runner.run() + + # Verify users were NOT created in dry-run mode + user1 = User.query.filter_by(email="submitter13@cern.ch").one_or_none() + user2 = User.query.filter_by(email="submitter10@gmail.com").one_or_none() + + # In dry-run mode, users should not be created + # For now, we just verify the runner completes without errors + assert user1 is None + assert user2 is None + + +def test_migrate_comments_from_metadata( + temp_dir, + migrated_record_with_comments, + db, +): + """Test migrating comments from comments_metadata.json.""" + # Create users first (required for comments migration) + user1 = User(email="submitter13@cern.ch", active=True) + user2 = User(email="submitter10@gmail.com", active=True) + db.session.add(user1) + db.session.add(user2) + db.session.commit() + + # Create directory structure for attached files + comments_dir = os.path.join(os.path.dirname(__file__), "data", "comments") + os.makedirs(comments_dir, exist_ok=True) + + # Run comments runner + log_dir = os.path.join(temp_dir, "logs") + runner = CommentsRunner( + stream_definition=CommentsStreamDefinition, + filepath=os.path.join( + os.path.dirname(__file__), "data", "comments", "comments_metadata.json" + ), + dirpath=comments_dir, + log_dir=log_dir, + dry_run=False, + ) + runner.run() + + current_requests_service.record_cls.index.refresh() + # Verify request was created + request = current_requests_service.search( + identity=system_identity, + q=f"topic.record:{migrated_record_with_comments['id']}", + ) + assert request.total == 1 + request = next(request.hits) + + current_events_service.record_cls.index.refresh() + # Verify comments were created as request events + comments = current_events_service.search( + identity=system_identity, + request_id=request["id"], + ) + assert comments.total == 2 # 1 comment and 1 reply + + +def test_migrate_comments_dry_run( + temp_dir, + migrated_record_with_comments, +): + """Test migrating comments in dry-run mode.""" + # Create directory structure for attached files + comments_dir = os.path.join(os.path.dirname(__file__), "data", "comments") + os.makedirs(comments_dir, exist_ok=True) + + # Run comments runner in dry-run mode + log_dir = os.path.join(temp_dir, "logs") + runner = CommentsRunner( + stream_definition=CommentsStreamDefinition, + filepath=os.path.join( + os.path.dirname(__file__), "data", "comments", "comments_metadata.json" + ), + dirpath=comments_dir, + log_dir=log_dir, + dry_run=True, + ) + runner.run() + + # In dry-run mode, request and comments should not be created + # Verify the runner completes without errors + current_requests_service.record_cls.index.refresh() + request = current_requests_service.search( + identity=system_identity, + q=f"topic.record:{migrated_record_with_comments['id']}", + ) + assert request.total == 0