diff --git a/README.md b/README.md index 6c570bf..2a42f18 100644 --- a/README.md +++ b/README.md @@ -14,14 +14,12 @@ pip install -r requirements.txt cp .archivessnake.yml.example .archivessnake.yml nano .archivessnake.yml # Add your ArchivesSpace credentials -# 3. Set environment variables -export ARCLIGHT_DIR=/path/to/your/arclight-app -export ASPACE_DIR=/path/to/your/archivesspace -export SOLR_URL=http://localhost:8983/solr/blacklight-core - -# 4. Run arcflow -python -m arcflow.main - +# 3. Run arcflow +python -m arcflow.main \ + --arclight-dir /path/to/your/arclight-app \ + --aspace-dir /path/to/your/archivesspace \ + --solr-url http://localhost:8983/solr/blacklight-core \ + --aspace-solr-url http://localhost:8983/solr/archivesspace ``` --- @@ -42,14 +40,32 @@ ArcFlow now generates standalone creator documents in addition to collection rec - Link to all collections where the creator is listed - Can be searched and displayed independently in ArcLight - Are marked with `is_creator: true` to distinguish from collections -- Must be fed into a Solr instance with fields to match their specific facets (See:Configure Solr Schema below ) +- Must be fed into a Solr instance with fields to match their specific facets (See: Configure Solr Schema below) + +### Agent Filtering + +**ArcFlow automatically filters agents to include only legitimate creators** of archival materials. The following agent types are **excluded** from indexing: + +- ✗ **System users** - ArchivesSpace software users (identified by `is_user` field) +- ✗ **System-generated agents** - Auto-created for users (identified by `system_generated` field) +- ✗ **Software agents** - Excluded by not querying the `/agents/software` endpoint +- ✗ **Repository agents** - Corporate entities representing the repository itself (identified by `is_repo_agent` field) +- ✗ **Donor-only agents** - Agents with only the 'donor' role and no creator role + +**Agents are included if they meet any of these criteria:** + +- ✓ Have the **'creator' role** in linked_agent_roles +- ✓ Are **linked to published records** (and not excluded by filters above) + +This filtering ensures that only legitimate archival creators are discoverable in ArcLight, while protecting privacy and security by excluding system users and donors. ### How Creator Records Work 1. **Extraction**: `get_all_agents()` fetches all agents from ArchivesSpace -2. **Processing**: `task_agent()` generates an EAC-CPF XML document for each agent with bioghist notes -3. **Linking**: Handled via Solr using the persistent_id field (agents and collections linked through bioghist references) -4. **Indexing**: Creator XML files are indexed to Solr using `traject_config_eac_cpf.rb` +2. **Filtering**: `is_target_agent()` filters out system users, donors, and non-creator agents +3. **Processing**: `task_agent()` generates an EAC-CPF XML document for each target agent with bioghist notes +4. **Linking**: Handled via Solr using the persistent_id field (agents and collections linked through bioghist references) +5. **Indexing**: Creator XML files are indexed to Solr using `traject_config_eac_cpf.rb` ### Creator Document Format @@ -119,7 +135,22 @@ This is a **one-time setup** per Solr instance. --- -To index creator documents to Solr: +### Traject Configuration for Creator Indexing + +The `traject_config_eac_cpf.rb` file defines how EAC-CPF creator records are mapped to Solr fields. + +**Search Order**: arcflow searches for the traject config following the collection records pattern: +1. **arcuit_dir parameter** (if provided via `--arcuit-dir`) - Highest priority, most up-to-date user control +2. **arcuit gem** (via `bundle show arcuit`) - For backward compatibility when arcuit_dir not provided +3. **example_traject_config_eac_cpf.rb** in arcflow - Fallback for module usage without arcuit + +**Example File**: arcflow includes `example_traject_config_eac_cpf.rb` as a reference implementation. For production: +- Copy this file to your arcuit gem as `traject_config_eac_cpf.rb`, or +- Specify the location with `--arcuit-dir /path/to/arcuit` + +**Logging**: arcflow clearly logs which traject config file is being used when creator indexing runs. + +To index creator documents to Solr manually: ```bash bundle exec traject \ @@ -166,7 +197,9 @@ Optional arguments: python -m arcflow.main \ --arclight-dir /path/to/arclight \ --aspace-dir /path/to/archivesspace \ - --solr-url http://localhost:8983/solr/blacklight-core + --solr-url http://localhost:8983/solr/blacklight-core \ + --aspace-solr-url http://localhost:8983/solr/archivesspace + ``` **Process only agents (skip collections):** @@ -175,6 +208,7 @@ python -m arcflow.main \ --arclight-dir /path/to/arclight \ --aspace-dir /path/to/archivesspace \ --solr-url http://localhost:8983/solr/blacklight-core \ + --aspace-solr-url http://localhost:8983/solr/archivesspace \ --agents-only ``` @@ -184,6 +218,7 @@ python -m arcflow.main \ --arclight-dir /path/to/arclight \ --aspace-dir /path/to/archivesspace \ --solr-url http://localhost:8983/solr/blacklight-core \ + --aspace-solr-url http://localhost:8983/solr/archivesspace \ --force-update ``` diff --git a/arcflow/main.py b/arcflow/main.py index 292d049..c3baa1f 100644 --- a/arcflow/main.py +++ b/arcflow/main.py @@ -16,8 +16,8 @@ from datetime import datetime, timezone from asnake.client import ASnakeClient from multiprocessing.pool import ThreadPool as Pool -from .utils.stage_classifications import extract_labels - +from utils.stage_classifications import extract_labels +import glob base_dir = os.path.abspath((__file__) + "/../../") log_file = os.path.join(base_dir, 'logs/arcflow.log') @@ -40,8 +40,9 @@ class ArcFlow: """ - def __init__(self, arclight_dir, aspace_dir, solr_url, traject_extra_config='', force_update=False, agents_only=False, collections_only=False, arcuit_dir=None, skip_creator_indexing=False): + def __init__(self, arclight_dir, aspace_dir, solr_url, aspace_solr_url, traject_extra_config='', force_update=False, agents_only=False, collections_only=False, arcuit_dir=None, skip_creator_indexing=False, pdf_timeout_queued=300, pdf_timeout_running=1800): self.solr_url = solr_url + self.aspace_solr_url = aspace_solr_url self.batch_size = 1000 clean_extra_config = traject_extra_config.strip() self.traject_extra_config = clean_extra_config or None @@ -53,6 +54,12 @@ def __init__(self, arclight_dir, aspace_dir, solr_url, traject_extra_config='', self.collections_only = collections_only self.arcuit_dir = arcuit_dir self.skip_creator_indexing = skip_creator_indexing + self.agents_only = agents_only + self.collections_only = collections_only + self.arcuit_dir = arcuit_dir + self.skip_creator_indexing = skip_creator_indexing + self.pdf_timeout_queued = pdf_timeout_queued # Timeout for jobs stuck in "queued" status + self.pdf_timeout_running = pdf_timeout_running # Timeout for jobs actively "running" self.log = logging.getLogger('arcflow') self.pid = os.getpid() self.pid_file_path = os.path.join(base_dir, 'arcflow.pid') @@ -217,6 +224,9 @@ def task_resource(self, repo, resource_id, xml_dir, pdf_dir, indent_size=0): 'resolve': ['classifications', 'classification_terms', 'linked_agents'], }).json() + if "ead_id" not in resource: + self.log.error(f'{indent}Resource {resource_id} is missing an ead_id. Skipping.') + return pdf_job xml_file_path = f'{xml_dir}/{resource["ead_id"]}.xml' # replace dots with dashes in EAD ID to avoid issues with Solr @@ -239,23 +249,23 @@ def task_resource(self, repo, resource_id, xml_dir, pdf_dir, indent_size=0): if xml.content: xml_content = xml.content.decode('utf-8') insert_pos = xml_content.find('') - + if insert_pos != -1: # Find the position after the closing tag did_end_pos = xml_content.find('', insert_pos) - + if did_end_pos != -1: # Move to after the tag did_end_pos += len('') extra_xml = '' - + # Add record group and subgroup labels rg_label, sg_label = extract_labels(resource)[1:3] if rg_label: extra_xml += f'\n{xml_escape(rg_label)}' if sg_label: extra_xml += f'\n{xml_escape(sg_label)}' - + # Handle biographical/historical notes from creator agents bioghist_content = self.get_creator_bioghist(resource, indent_size=indent_size) if bioghist_content: @@ -263,26 +273,26 @@ def task_resource(self, repo, resource_id, xml_dir, pdf_dir, indent_size=0): # Search for existing bioghist after but before archdesc_end = xml_content.find('', did_end_pos) search_section = xml_content[did_end_pos:archdesc_end] if archdesc_end != -1 else xml_content[did_end_pos:] - + # Look for closing tag existing_bioghist_end = search_section.rfind('') - + if existing_bioghist_end != -1: # Found existing bioghist - insert agent elements INSIDE it (before closing tag) insert_pos = did_end_pos + existing_bioghist_end - xml_content = (xml_content[:insert_pos] + - f'\n{bioghist_content}\n' + + xml_content = (xml_content[:insert_pos] + + f'\n{bioghist_content}\n' + xml_content[insert_pos:]) else: # No existing bioghist - wrap agent elements in parent container wrapped_content = f'\n{bioghist_content}\n' extra_xml += f'\n{wrapped_content}' - + if extra_xml: - xml_content = (xml_content[:did_end_pos] + - extra_xml + + xml_content = (xml_content[:did_end_pos] + + extra_xml + xml_content[did_end_pos:]) - + xml_content = xml_content.encode('utf-8') else: xml_content = xml.content @@ -347,18 +357,105 @@ def task_repository(self, repo, xml_dir, modified_since, indent_size=0): def task_pdf(self, repo_uri, job_id, ead_id, pdf_dir, indent_size=0): + """ + Wait for an ArchivesSpace PDF generation job to complete and save the result. + + Uses a two-phase timeout approach: + - Phase 1 (queued): Short timeout to detect if background processor isn't running + - Phase 2 (running): Longer timeout to allow large PDFs to complete + + Args: + repo_uri: Repository URI + job_id: Job ID in ArchivesSpace + ead_id: EAD identifier for the resource + pdf_dir: Directory to save PDF files + indent_size: Indentation level for logging + + Returns: + True if successful, False if timeout or job failed + """ indent = ' ' * indent_size + start_time = time.time() + poll_interval = 5 # seconds between status checks + warning_threshold = 60 # warn if queued for more than 1 minute + last_warning_time = 0 + poll_count = 0 + + # Track status transitions for smart timeout + status_start_times = {} # Track when each status began + current_timeout = self.pdf_timeout_queued # Start with queued timeout + while True: - job_status = self.client.get( - f'{repo_uri}/jobs/{job_id}').json()['status'] + elapsed_time = time.time() - start_time + + # Check for timeout (with appropriate timeout for current status) + if elapsed_time > current_timeout: + # Determine which phase timed out for appropriate error message + last_status = list(status_start_times.keys())[-1] if status_start_times else 'queued' + + if last_status == 'queued': + self.log.error( + f'{indent}Timeout waiting for ArchivesSpace {self.job_type}_{job_id} ' + f'after {int(elapsed_time)} seconds. Job stuck in queued status.\n' + f'{indent}TROUBLESHOOTING: Background job processing may not be enabled in ArchivesSpace:\n' + f'{indent} 1. Check config/config.rb: AppConfig[:job_thread_count] must be > 0 (default is 2)\n' + f'{indent} 2. If you changed the config, restart ArchivesSpace: ./archivesspace.sh restart\n' + f'{indent} 3. Verify ArchivesSpace is processing jobs in the web UI: System → Background Jobs\n' + f'{indent} 4. Check ArchivesSpace logs for errors: logs/archivesspace.out\n' + f'{indent}Continuing without PDF for "{ead_id}"...' + ) + else: + self.log.error( + f'{indent}Timeout waiting for ArchivesSpace {self.job_type}_{job_id} ' + f'after {int(elapsed_time)} seconds in "{last_status}" status.\n' + f'{indent}This may be a very large PDF taking longer than expected.\n' + f'{indent}Consider increasing timeout with: --pdf-timeout-running {int(current_timeout * 2)}\n' + f'{indent}Continuing without PDF for "{ead_id}"...' + ) + + # Create empty PDF file and continue + self.save_file( + f'{pdf_dir}/{ead_id}.pdf', + b'', + 'PDF (empty - job timed out)', + indent_size=indent_size) + return False + + try: + job_status = self.client.get( + f'{repo_uri}/jobs/{job_id}').json()['status'] + except Exception as e: + self.log.error(f'{indent}Error checking job status for {self.job_type}_{job_id}: {e}') + time.sleep(poll_interval) + continue + + # Track status transitions and adjust timeout accordingly + if job_status not in status_start_times: + status_start_times[job_status] = time.time() + + # When job transitions from queued to running, switch to running timeout + if job_status == 'running' and 'queued' in status_start_times: + time_in_queued = status_start_times[job_status] - status_start_times['queued'] + # Reset timeout clock for running phase + start_time = time.time() + current_timeout = self.pdf_timeout_running + self.log.info( + f'{indent}Job {self.job_type}_{job_id} transitioned to "running" ' + f'after {int(time_in_queued)}s in queue. ' + f'Now allowing up to {int(self.pdf_timeout_running)}s for PDF generation...' + ) if job_status in ('completed', 'canceled', 'failed'): if job_status == 'completed': - file_id = self.client.get( - f'{repo_uri}/jobs/{job_id}/output_files').json()[0] + try: + file_id = self.client.get( + f'{repo_uri}/jobs/{job_id}/output_files').json()[0] - pdf = self.client.get( - f'{repo_uri}/jobs/{job_id}/output_files/{file_id}') + pdf = self.client.get( + f'{repo_uri}/jobs/{job_id}/output_files/{file_id}') + except Exception as e: + self.log.error(f'{indent}Error retrieving PDF output for {self.job_type}_{job_id}: {e}') + pdf = None elif job_status in ('canceled', 'failed'): self.log.error(f'{indent}ArchivesSpace {self.job_type}_{job_id} {job_status}.') pdf = None @@ -386,11 +483,34 @@ def task_pdf(self, repo_uri, job_id, ead_id, pdf_dir, indent_size=0): 'PDF', indent_size=indent_size) - self.log.info(f'Finished processing "{ead_id}".') + self.log.info(f'{indent}Finished processing "{ead_id}" (status: {job_status}).') return True - self.log.info(f'{indent}Waiting for ArchivesSpace {self.job_type}_{job_id} to complete... (current status: {job_status})') - time.sleep(5) + # Enhanced logging for queued jobs + poll_count += 1 + if job_status == 'queued': + # Show warning if job has been queued for too long + if elapsed_time > warning_threshold and (elapsed_time - last_warning_time) > warning_threshold: + self.log.warning( + f'{indent}Job {self.job_type}_{job_id} has been queued for {int(elapsed_time)} seconds. ' + f'This may indicate the ArchivesSpace background job processor is not running.' + ) + last_warning_time = elapsed_time + + # Only log every 4th poll (every 20 seconds) to reduce log spam + if poll_count % 4 == 0: + self.log.info( + f'{indent}Waiting for ArchivesSpace {self.job_type}_{job_id} ' + f'(status: {job_status}, elapsed: {int(elapsed_time)}s, timeout in: {int(current_timeout - elapsed_time)}s)' + ) + else: + # For non-queued statuses (running, etc.), log every time + self.log.info( + f'{indent}Waiting for ArchivesSpace {self.job_type}_{job_id} ' + f'(status: {job_status}, elapsed: {int(elapsed_time)}s)' + ) + + time.sleep(poll_interval) def update_eads(self): @@ -399,10 +519,11 @@ def update_eads(self): ArchivesSpace. """ xml_dir = f'{self.arclight_dir}/public/xml' + resource_dir = f'{xml_dir}/resources' pdf_dir = f'{self.arclight_dir}/public/pdf' modified_since = int(self.last_updated.timestamp()) - + if self.force_update or modified_since <= 0: modified_since = 0 # delete all EADs and related files in ArcLight Solr @@ -412,7 +533,7 @@ def update_eads(self): json={'delete': {'query': '*:*'}}, ) if response.status_code == 200: - self.log.info('Deleted all EADs from ArcLight Solr.') + self.log.info('Deleted all EADs and Creators from ArcLight Solr.') # delete related directories after suscessful # deletion from solr for dir_path, dir_name in [(xml_dir, 'XMLs'), (pdf_dir, 'PDFs')]: @@ -424,10 +545,10 @@ def update_eads(self): else: self.log.error(f'Failed to delete all EADs from Arclight Solr. Status code: {response.status_code}') except requests.exceptions.RequestException as e: - self.log.error(f'Error deleting all EADs from ArcLight Solr: {e}') + self.log.error(f'Error deleting all EADs and Creators from ArcLight Solr: {e}') # create directories if don't exist - for dir_path in (xml_dir, pdf_dir): + for dir_path in (resource_dir, pdf_dir): os.makedirs(dir_path, exist_ok=True) # process resources that have been modified in ArchivesSpace since last update @@ -440,7 +561,7 @@ def update_eads(self): # Tasks for processing repositories results_1 = [pool.apply_async( self.task_repository, - args=(repo, xml_dir, modified_since, indent_size)) + args=(repo, resource_dir, modified_since, indent_size)) for repo in repos] # Collect outputs from repository tasks outputs_1 = [r.get() for r in results_1] @@ -448,7 +569,7 @@ def update_eads(self): # Tasks for processing resources results_2 = [pool.apply_async( self.task_resource, - args=(repo, resource_id, xml_dir, pdf_dir, indent_size)) + args=(repo, resource_id, resource_dir, pdf_dir, indent_size)) for repo, resources in outputs_1 for resource_id in resources] # Collect outputs from resource tasks outputs_2 = [r.get() for r in results_2] @@ -463,7 +584,7 @@ def update_eads(self): # Tasks for indexing pending resources results_3 = [pool.apply_async( self.index_collections, - args=(repo_id, f'{xml_dir}/{repo_id}_*_batch_{batch_num}.xml', indent_size)) + args=(repo_id, f'{resource_dir}/{repo_id}_*_batch_{batch_num}.xml', indent_size)) for repo_id, batch_num in batches] # Wait for indexing tasks to complete @@ -472,7 +593,7 @@ def update_eads(self): # Remove pending symlinks after indexing for repo_id, batch_num in batches: - xml_file_path = f'{xml_dir}/{repo_id}_*_batch_{batch_num}.xml' + xml_file_path = f'{resource_dir}/{repo_id}_*_batch_{batch_num}.xml' try: result = subprocess.run( f'rm {xml_file_path}', @@ -495,14 +616,23 @@ def update_eads(self): for r in results_4: r.get() - # processing deleted resources is not needed when - # force-update is set or modified_since is set to 0 - if self.force_update or modified_since <= 0: - self.log.info('Skipping deleted resources processing.') - return + return + + + + def process_deleted_records(self): + + xml_dir = f'{self.arclight_dir}/public/xml' + resource_dir = f'{xml_dir}/resources' + agent_dir = f'{xml_dir}/agents' + pdf_dir = f'{self.arclight_dir}/public/pdf' + modified_since = int(self.last_updated.timestamp()) + + # process records that have been deleted since last update in ArchivesSpace + resource_pattern = r'^/repositories/(?P\d+)/resources/(?P\d+)$' + agent_pattern = r'^/agents/(?Ppeople|corporate_entities|families)/(?P\d+)$' + - # process resources that have been deleted since last update in ArchivesSpace - pattern = r'^/repositories/(?P\d+)/resources/(?P\d+)$' page = 1 while True: deleted_records = self.client.get( @@ -513,12 +643,13 @@ def update_eads(self): } ).json() for record in deleted_records['results']: - match = re.match(pattern, record) - if match: - resource_id = match.group('resource_id') + resource_match = re.match(resource_pattern, record) + agent_match = re.match(agent_pattern, record) + if resource_match and not self.agents_only: + resource_id = resource_match.group('resource_id') self.log.info(f'{" " * indent_size}Processing deleted resource ID {resource_id}...') - symlink_path = f'{xml_dir}/{resource_id}.xml' + symlink_path = f'{resource_dir}/{resource_id}.xml' ead_id = self.get_ead_from_symlink(symlink_path) if ead_id: self.delete_ead( @@ -526,10 +657,18 @@ def update_eads(self): ead_id.replace('.', '-'), # dashes in Solr f'{xml_dir}/{ead_id}.xml', # dots in filenames f'{pdf_dir}/{ead_id}.pdf', - indent=4) + indent_size=4) else: self.log.error(f'{" " * (indent_size+2)}Symlink {symlink_path} not found. Unable to delete the associated EAD from Arclight Solr.') + if agent_match and not self.collections_only: + agent_id = agent_match.group('agent_id') + self.log.info(f'{" " * indent_size}Processing deleted agent ID {agent_id}...') + file_path = f'{agent_dir}/{agent_id}.xml' + agent_solr_id = f'creator_{agent_type}_{agent_id}' + self.delete_creator(file_path, agent_solr_id, indent_size) + + if deleted_records['last_page'] == page: break page += 1 @@ -548,13 +687,13 @@ def index_collections(self, repo_id, xml_file_path, indent_size=0): cwd=self.arclight_dir ) arclight_path = result_show.stdout.strip() if result_show.returncode == 0 else '' - + if not arclight_path: self.log.error(f'{indent}Could not find arclight gem path') return - + traject_config = f'{arclight_path}/lib/arclight/traject/ead2_config.rb' - + xml_files = glob.glob(xml_file_path) # Returns list of matching files cmd = [ 'bundle', 'exec', 'traject', '-u', self.solr_url, @@ -563,21 +702,18 @@ def index_collections(self, repo_id, xml_file_path, indent_size=0): '-s', f'solr_writer.batch_size={self.batch_size}', '-s', 'solr_writer.commit_on_close=true', '-i', 'xml', - '-c', traject_config - ] - + '-c', traject_config, + ] + xml_files + if self.traject_extra_config: if isinstance(self.traject_extra_config, (list, tuple)): cmd.extend(self.traject_extra_config) else: # Treat a string extra config as a path and pass it with -c cmd.extend(['-c', self.traject_extra_config]) - - cmd.append(xml_file_path) - + env = os.environ.copy() env['REPOSITORY_ID'] = str(repo_id) - result = subprocess.run( cmd, cwd=self.arclight_dir, @@ -603,10 +739,10 @@ def get_creator_bioghist(self, resource, indent_size=0): """ indent = ' ' * indent_size bioghist_elements = [] - + if 'linked_agents' not in resource: return None - + # Process linked_agents in order to maintain consistency with origination order for linked_agent in resource['linked_agents']: # Only process agents with 'creator' role @@ -615,10 +751,10 @@ def get_creator_bioghist(self, resource, indent_size=0): if agent_ref: try: agent = self.client.get(agent_ref).json() - + # Get agent name for head element agent_name = agent.get('title') or agent.get('display_name', {}).get('sort_name', 'Unknown') - + # Check for notes in the agent record if 'notes' in agent: for note in agent['notes']: @@ -630,7 +766,7 @@ def get_creator_bioghist(self, resource, indent_size=0): self.log.error(f'{indent}**ASSUMPTION VIOLATION**: Expected persistent_id in note_bioghist for agent {agent_ref}') # Skip creating id attribute if persistent_id is missing persistent_id = None - + # Extract note content from subnotes paragraphs = [] if 'subnotes' in note: @@ -652,7 +788,7 @@ def get_creator_bioghist(self, resource, indent_size=0): # Wrap each line in

tags for line in lines: paragraphs.append(f'

{line}

') - + # Create nested bioghist element if we have paragraphs if paragraphs: paragraphs_xml = '\n'.join(paragraphs) @@ -665,7 +801,7 @@ def get_creator_bioghist(self, resource, indent_size=0): bioghist_elements.append(bioghist_el) except Exception as e: self.log.error(f'{indent}Error fetching biographical information for agent {agent_ref}: {e}') - + if bioghist_elements: # Return the agent bioghist elements (unwrapped) # The caller will decide whether to wrap them based on whether @@ -673,80 +809,153 @@ def get_creator_bioghist(self, resource, indent_size=0): return '\n'.join(bioghist_elements) return None + def _get_target_agent_criteria(self, modified_since=0): + """ + Defines the Solr query criteria for "target" agents. + These are agents we want to process. + """ + criteria = [ + "linked_agent_roles:creator", + "system_generated:false", + "is_user:false", +# "is_repo_agent:false", + ] - def get_all_agents(self, agent_types=None, modified_since=0, indent_size=0): + # Add time filter if applicable + if modified_since > 0 and not self.force_update: + mtime_utc = datetime.fromtimestamp(modified_since, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + criteria.append(f"system_mtime:[{mtime_utc} TO *]") + + return criteria + + def _get_nontarget_agent_criteria(self, modified_since=0): """ - Fetch ALL agents from ArchivesSpace (not just creators). - Uses direct agent API endpoints for comprehensive coverage. - + Defines the Solr query criteria for "non-target" (excluded) agents. + This is the logical inverse of the target criteria. + """ + # The core logic for what makes an agent a "target" + target_logic = " AND ".join([ + "linked_agent_roles:creator", + "system_generated:false", + "is_user:false", +# "is_repo_agent:false", + ]) + + # We find non-targets by negating the entire block of target logic + criteria = [f"NOT ({target_logic})"] + + # We still apply the time filter to the overall query + if modified_since > 0 and not self.force_update: + mtime_utc = datetime.fromtimestamp(modified_since, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + criteria.append(f"system_mtime:[{mtime_utc} TO *]") + + return criteria + + def _execute_solr_query(self, query_parts, solr_url=None, fields=['id'], indent_size=0): + """ + A generic function to execute a query against the Solr index. + Args: - agent_types: List of agent types to fetch. Default: ['corporate_entities', 'people', 'families'] - modified_since: Unix timestamp to filter agents modified since this time (if API supports it) - indent_size: Indentation size for logging - + query_parts (list): A list of strings that will be joined with " AND ". + fields (list): A list of Solr fields to return in the response. + Returns: - set: Set of agent URIs (e.g., '/agents/corporate_entities/123') + list: A list of dictionaries, where each dictionary contains the requested fields. + Returns an empty list on failure. + """ + indent = ' ' * indent_size + if not query_parts: + self.log.error("Cannot execute Solr query with empty criteria.") + return [] + + if not solr_url: + solr_url = self.solr_url + + query_string = " AND ".join(query_parts) + self.log.info(f"{indent}Executing Solr query: {query_string}") + + try: + # First, get the total count of matching documents + count_params = {'q': query_string, 'rows': 0, 'wt': 'json'} + count_response = requests.get(f'{solr_url}/select', params=count_params) + self.log.info(f" [Solr Count Request]: {count_response.request.url}") + + count_response.raise_for_status() + num_found = count_response.json()['response']['numFound'] + + if num_found == 0: + return [] # No need to query again if nothing was found + + # Now, fetch the actual data for the documents + data_params = { + 'q': query_string, + 'rows': num_found, # Use the exact count to fetch all results + 'fl': ','.join(fields), # Join field list into a comma-separated string + 'wt': 'json' + } + response = requests.get(f'{solr_url}/select', params=data_params) + response.raise_for_status() + # Log the exact URL for the data request + self.log.info(f" [Solr Data Request]: {response.request.url}") + + return response.json()['response']['docs'] + + except requests.exceptions.RequestException as e: + self.log.error(f"Failed to execute Solr query: {e}") + self.log.error(f" Failed query string: {query_string}") + return [] + + def get_all_agents(self, agent_types=None, modified_since=0, indent_size=0): + """ + Fetch target agent URIs from the Solr index and log non-target agents. """ if agent_types is None: - agent_types = ['corporate_entities', 'people', 'families'] - + agent_types = ['agent_person', 'agent_corporate_entity', 'agent_family'] + + if self.force_update: + modified_since = 0 indent = ' ' * indent_size - all_agents = set() - - self.log.info(f'{indent}Fetching ALL agents from ArchivesSpace...') - - for agent_type in agent_types: - try: - # Try with modified_since parameter first - params = {'all_ids': True} - if modified_since > 0: - params['modified_since'] = modified_since - - response = self.client.get(f'/agents/{agent_type}', params=params) - agent_ids = response.json() - - self.log.info(f'{indent}Found {len(agent_ids)} {agent_type} agents') - - # Add agent URIs to set - for agent_id in agent_ids: - agent_uri = f'/agents/{agent_type}/{agent_id}' - all_agents.add(agent_uri) - - except Exception as e: - self.log.error(f'{indent}Error fetching {agent_type} agents: {e}') - # If modified_since fails, try without it - if modified_since > 0: - self.log.warning(f'{indent}Retrying {agent_type} without modified_since filter...') - try: - response = self.client.get(f'/agents/{agent_type}', params={'all_ids': True}) - agent_ids = response.json() - self.log.info(f'{indent}Found {len(agent_ids)} {agent_type} agents (no date filter)') - for agent_id in agent_ids: - agent_uri = f'/agents/{agent_type}/{agent_id}' - all_agents.add(agent_uri) - except Exception as e2: - self.log.error(f'{indent}Failed to fetch {agent_type} agents: {e2}') - - self.log.info(f'{indent}Found {len(all_agents)} total agents across all types.') - return all_agents + self.log.info(f'{indent}Fetching agent data from Solr...') + # Base criteria for all queries in this function + base_criteria = [f"primary_type:({' OR '.join(agent_types)})"] + + # Get and log the non-target agents + nontarget_criteria = base_criteria + self._get_nontarget_agent_criteria(modified_since) + excluded_docs = self._execute_solr_query(nontarget_criteria,self.aspace_solr_url, fields=['id']) + if excluded_docs: + excluded_ids = [doc['id'] for doc in excluded_docs] + self.log.info(f"{indent} Found {len(excluded_ids)} non-target (excluded) agents.") + # Optional: Log the actual IDs if the list isn't too long + # for agent_id in excluded_ids: + # self.log.debug(f"{indent} - Excluded: {agent_id}") + + # Get and return the target agents + target_criteria = base_criteria + self._get_target_agent_criteria(modified_since) + self.log.info('Target Criteria:') + target_docs = self._execute_solr_query(target_criteria, self.aspace_solr_url, fields=['id']) + + target_agents = [doc['id'] for doc in target_docs] + self.log.info(f"{indent} Found {len(target_agents)} target agents to process.") + + return target_agents def task_agent(self, agent_uri, agents_dir, repo_id=1, indent_size=0): """ Process a single agent and generate a creator document in EAC-CPF XML format. Retrieves EAC-CPF directly from ArchivesSpace archival_contexts endpoint. - + Args: agent_uri: Agent URI from ArchivesSpace (e.g., '/agents/corporate_entities/123') agents_dir: Directory to save agent XML files repo_id: Repository ID to use for archival_contexts endpoint (default: 1) indent_size: Indentation size for logging - + Returns: str: Creator document ID if successful, None otherwise """ indent = ' ' * indent_size - + try: # Parse agent URI to extract type and ID # URI format: /agents/{agent_type}/{id} @@ -754,25 +963,25 @@ def task_agent(self, agent_uri, agents_dir, repo_id=1, indent_size=0): if len(parts) != 3 or parts[0] != 'agents': self.log.error(f'{indent}Invalid agent URI format: {agent_uri}') return None - + agent_type = parts[1] # e.g., 'corporate_entities', 'people', 'families' agent_id = parts[2] - + # Construct EAC-CPF endpoint # Format: /repositories/{repo_id}/archival_contexts/{agent_type}/{id}.xml eac_cpf_endpoint = f'/repositories/{repo_id}/archival_contexts/{agent_type}/{agent_id}.xml' - + self.log.debug(f'{indent}Fetching EAC-CPF from: {eac_cpf_endpoint}') - + # Fetch EAC-CPF XML response = self.client.get(eac_cpf_endpoint) - + if response.status_code != 200: self.log.error(f'{indent}Failed to fetch EAC-CPF for {agent_uri}: HTTP {response.status_code}') return None - + eac_cpf_xml = response.text - + # Parse the EAC-CPF XML to validate and inspect its structure try: root = ET.fromstring(eac_cpf_xml) @@ -780,18 +989,18 @@ def task_agent(self, agent_uri, agents_dir, repo_id=1, indent_size=0): except ET.ParseError as e: self.log.error(f'{indent}Failed to parse EAC-CPF XML for {agent_uri}: {e}') return None - + # Generate creator ID creator_id = f'creator_{agent_type}_{agent_id}' - + # Save EAC-CPF XML to file filename = f'{agents_dir}/{creator_id}.xml' with open(filename, 'w', encoding='utf-8') as f: f.write(eac_cpf_xml) - + self.log.info(f'{indent}Created creator document: {creator_id}') return creator_id - + except Exception as e: self.log.error(f'{indent}Error processing agent {agent_uri}: {e}') import traceback @@ -844,14 +1053,15 @@ def process_creators(self): self.log.info(f'{indent}Indexing {len(creator_ids)} creator records to Solr...') traject_config = self.find_traject_config() if traject_config: + self.log.info(f'{indent}Using traject config: {traject_config}') indexed = self.index_creators(agents_dir, creator_ids) self.log.info(f'{indent}Creator indexing complete: {indexed}/{len(creator_ids)} indexed') else: - self.log.info(f'{indent}Skipping creator indexing (traject config not found)') + self.log.warning(f'{indent}Skipping creator indexing (traject config not found)') self.log.info(f'{indent}To index manually:') self.log.info(f'{indent} cd {self.arclight_dir}') self.log.info(f'{indent} bundle exec traject -u {self.solr_url} -i xml \\') - self.log.info(f'{indent} -c /path/to/arcuit/arcflow/traject_config_eac_cpf.rb \\') + self.log.info(f'{indent} -c /path/to/arcuit-gem/traject_config_eac_cpf.rb \\') self.log.info(f'{indent} {agents_dir}/*.xml') elif self.skip_creator_indexing: self.log.info(f'{indent}Skipping creator indexing (--skip-creator-indexing flag set)') @@ -862,16 +1072,33 @@ def process_creators(self): def find_traject_config(self): """ Find the traject config for creator indexing. - - Tries: - 1. bundle show arcuit (finds installed gem) - 2. self.arcuit_dir (explicit path) - 3. Returns None if neither works - + + Search order (follows collection records pattern): + 1. arcuit_dir if provided (most up-to-date user control) + 2. arcuit gem via bundle show (for backward compatibility) + 3. example_traject_config_eac_cpf.rb in arcflow (fallback when used as module without arcuit) + Returns: str: Path to traject config, or None if not found """ - # Try bundle show arcuit first + self.log.info('Searching for traject_config_eac_cpf.rb...') + searched_paths = [] + + # Try 1: arcuit_dir if provided (highest priority - user's explicit choice) + if self.arcuit_dir: + self.log.debug(f' Checking arcuit_dir parameter: {self.arcuit_dir}') + candidate_paths = [ + os.path.join(self.arcuit_dir, 'traject_config_eac_cpf.rb'), + os.path.join(self.arcuit_dir, 'lib', 'arcuit', 'traject', 'traject_config_eac_cpf.rb'), + ] + searched_paths.extend(candidate_paths) + for traject_config in candidate_paths: + if os.path.exists(traject_config): + self.log.info(f'✓ Using traject config from arcuit_dir: {traject_config}') + return traject_config + self.log.debug(' traject_config_eac_cpf.rb not found in arcuit_dir') + + # Try 2: bundle show arcuit (for backward compatibility when arcuit_dir not provided) try: result = subprocess.run( ['bundle', 'show', 'arcuit'], @@ -882,77 +1109,84 @@ def find_traject_config(self): ) if result.returncode == 0: arcuit_path = result.stdout.strip() - # Prefer config at gem root, fall back to legacy subdirectory layout + self.log.debug(f' Found arcuit gem at: {arcuit_path}') candidate_paths = [ os.path.join(arcuit_path, 'traject_config_eac_cpf.rb'), - os.path.join(arcuit_path, 'arcflow', 'traject_config_eac_cpf.rb'), + os.path.join(arcuit_path, 'lib', 'arcuit', 'traject', 'traject_config_eac_cpf.rb'), ] + searched_paths.extend(candidate_paths) for traject_config in candidate_paths: if os.path.exists(traject_config): - self.log.info(f'Found traject config via bundle show: {traject_config}') + self.log.info(f'✓ Using traject config from arcuit gem: {traject_config}') return traject_config - self.log.warning( - 'bundle show arcuit succeeded but traject_config_eac_cpf.rb ' - 'was not found in any expected location under the gem root' + self.log.debug( + ' traject_config_eac_cpf.rb not found in arcuit gem ' + '(checked root and lib/arcuit/traject/ subdirectory)' ) else: - self.log.debug('bundle show arcuit failed (gem not installed?)') + self.log.debug(' arcuit gem not found via bundle show') except Exception as e: - self.log.debug(f'Error running bundle show arcuit: {e}') - # Fall back to arcuit_dir if provided - if self.arcuit_dir: - candidate_paths = [ - os.path.join(self.arcuit_dir, 'traject_config_eac_cpf.rb'), - os.path.join(self.arcuit_dir, 'arcflow', 'traject_config_eac_cpf.rb'), - ] - for traject_config in candidate_paths: - if os.path.exists(traject_config): - self.log.info(f'Using traject config from arcuit_dir: {traject_config}') - return traject_config - self.log.warning( - 'arcuit_dir provided but traject_config_eac_cpf.rb was not found ' - 'in any expected location' + self.log.debug(f' Error checking for arcuit gem: {e}') + + # Try 3: example file in arcflow package (fallback for module usage without arcuit) + # We know exactly where this file is located - at the repo root + arcflow_package_dir = os.path.dirname(os.path.abspath(__file__)) + arcflow_repo_root = os.path.dirname(arcflow_package_dir) + traject_config = os.path.join(arcflow_repo_root, 'example_traject_config_eac_cpf.rb') + searched_paths.append(traject_config) + + if os.path.exists(traject_config): + self.log.info(f'✓ Using example traject config from arcflow: {traject_config}') + self.log.info( + ' Note: Using example config. For production, copy this file to your ' + 'arcuit gem or specify location with --arcuit-dir.' ) - # No config found - self.log.warning('Could not find traject config (bundle show arcuit failed and arcuit_dir not provided or invalid)') + return traject_config + + # No config found anywhere - show all paths searched + self.log.error('✗ Could not find traject_config_eac_cpf.rb in any of these locations:') + for i, path in enumerate(searched_paths, 1): + self.log.error(f' {i}. {path}') + self.log.error('') + self.log.error(' Add traject_config_eac_cpf.rb to your arcuit gem or specify with --arcuit-dir.') return None def index_creators(self, agents_dir, creator_ids, batch_size=100): """ Index creator XML files to Solr using traject. - + Args: agents_dir: Directory containing creator XML files creator_ids: List of creator IDs to index batch_size: Number of files to index per traject call (default: 100) - + Returns: int: Number of successfully indexed creators """ traject_config = self.find_traject_config() if not traject_config: return 0 - + indexed_count = 0 failed_count = 0 - + # Process in batches to avoid command line length limits total_batches = math.ceil(len(creator_ids) / batch_size) for i in range(0, len(creator_ids), batch_size): batch = creator_ids[i:i+batch_size] batch_num = (i // batch_size) + 1 - + # Build list of XML files for this batch xml_files = [f'{agents_dir}/{cid}.xml' for cid in batch] - + # Filter to only existing files existing_files = [f for f in xml_files if os.path.exists(f)] - + if not existing_files: self.log.warning(f' Batch {batch_num}/{total_batches}: No files found, skipping') continue - + try: cmd = [ 'bundle', 'exec', 'traject', @@ -960,16 +1194,16 @@ def index_creators(self, agents_dir, creator_ids, batch_size=100): '-i', 'xml', '-c', traject_config ] + existing_files - + self.log.info(f' Indexing batch {batch_num}/{total_batches}: {len(existing_files)} files') - + result = subprocess.run( cmd, cwd=self.arclight_dir, stderr=subprocess.PIPE, timeout=300 # 5 minute timeout per batch ) - + if result.returncode == 0: indexed_count += len(existing_files) self.log.info(f' Successfully indexed {len(existing_files)} creators') @@ -978,7 +1212,7 @@ def index_creators(self, agents_dir, creator_ids, batch_size=100): self.log.error(f' Traject failed with exit code {result.returncode}') if result.stderr: self.log.error(f' STDERR: {result.stderr.decode("utf-8")}') - + except subprocess.TimeoutExpired: self.log.error(f' Traject timed out for batch {batch_num}/{total_batches}') failed_count += len(existing_files) @@ -988,7 +1222,7 @@ def index_creators(self, agents_dir, creator_ids, batch_size=100): if failed_count > 0: self.log.warning(f'Creator indexing completed with errors: {indexed_count} succeeded, {failed_count} failed') - + return indexed_count @@ -1068,37 +1302,49 @@ def create_symlink(self, target_path, symlink_path, indent_size=0): self.log.info(f'{indent}{e}') return False - - def delete_ead(self, resource_id, ead_id, - xml_file_path, pdf_file_path, indent_size=0): + def delete_arclight_solr_record(self, solr_record_id, indent_size=0): indent = ' ' * indent_size - # delete from solr + try: response = requests.post( f'{self.solr_url}/update?commit=true', - json={'delete': {'id': ead_id}}, + json={'delete': {'id': solr_record_id}}, ) if response.status_code == 200: - self.log.info(f'{indent}Deleted EAD "{ead_id}" from ArcLight Solr.') - # delete related files after suscessful deletion from solr - for file_path in (xml_file_path, pdf_file_path): - try: - os.remove(file_path) - self.log.info(f'{indent}Deleted file {file_path}.') - except FileNotFoundError: - self.log.error(f'{indent}File {file_path} not found.') - - # delete symlink if exists - symlink_path = f'{os.path.dirname(xml_file_path)}/{resource_id}.xml' - try: - os.remove(symlink_path) - self.log.info(f'{indent}Deleted symlink {symlink_path}.') - except FileNotFoundError: - self.log.info(f'{indent}Symlink {symlink_path} not found.') + self.log.info(f'{indent}Deleted Solr record {solr_record_id}. from ArcLight Solr') + return True else: - self.log.error(f'{indent}Failed to delete EAD "{ead_id}" from Arclight Solr. Status code: {response.status_code}') + self.log.error( + f'{indent}Failed to delete Solr record {solr_record_id} from Arclight Solr. Status code: {response.status_code}') + return False except requests.exceptions.RequestException as e: - self.log.error(f'{indent}Error deleting EAD "{ead_id}" from ArcLight Solr: {e}') + self.log.error(f'{indent}Error deleting Solr record {solr_record_id} from ArcLight Solr: {e}') + + def delete_file(self, file_path, indent_size=0): + indent = ' ' * indent_size + + try: + os.remove(file_path) + self.log.info(f'{indent}Deleted file {file_path}.') + except FileNotFoundError: + self.log.error(f'{indent}File {file_path} not found.') + + def delete_ead(self, resource_id, ead_id, + xml_file_path, pdf_file_path, indent_size=0): + # delete from solr + deleted_solr_record = self.delete_arclight_solr_record(ead_id, indent_size=indent_size) + if deleted_solr_record: + self.delete_file(pdf_file_path, indent_size=indent_size) + self.delete_file(xml_file_path, indent_size=indent_size) + # delete symlink if exists + symlink_path = f'{os.path.dirname(xml_file_path)}/{resource_id}.xml' + self.delete_file(symlink_path, indent_size=indent_size) + + def delete_creator(self, file_path, solr_id, indent_size=0): + deleted_solr_record = self.delete_arclight_solr_record(solr_id, indent_size=indent_size) + if deleted_solr_record: + self.delete_file(file_path, indent_size=indent_size) + def save_config_file(self): @@ -1120,23 +1366,30 @@ def run(self): Run the ArcFlow process. """ self.log.info(f'ArcFlow process started (PID: {self.pid}).') - + # Update repositories (unless agents-only mode) if not self.agents_only: self.update_repositories() - + # Update collections/EADs (unless agents-only mode) if not self.agents_only: self.update_eads() - + # Update creator records (unless collections-only mode) if not self.collections_only: self.process_creators() - + + # processing deleted resources is not needed when + # force-update is set or modified_since is set to 0 + if self.force_update or int(self.last_updated.timestamp()) <= 0: + self.log.info('Skipping deleted record processing.') + else: + self.process_deleted_records() + self.save_config_file() self.log.info(f'ArcFlow process completed (PID: {self.pid}). Elapsed time: {time.strftime("%H:%M:%S", time.gmtime(int(time.time()) - self.start_time))}.') - + def main(): @@ -1156,7 +1409,11 @@ def main(): parser.add_argument( '--solr-url', required=True, - help='URL of the Solr core',) + help='URL of the ArcLight Solr core',) + parser.add_argument( + '--aspace-solr-url', + required=True, + help='URL of the ASpace Solr core',) parser.add_argument( '--traject-extra-config', default='', @@ -1177,22 +1434,35 @@ def main(): '--skip-creator-indexing', action='store_true', help='Generate creator XML files but skip Solr indexing (for testing)',) + parser.add_argument( + '--pdf-timeout-queued', + type=int, + default=300, + help='Timeout in seconds for PDF jobs stuck in "queued" status (default: 300 = 5 minutes)',) + parser.add_argument( + '--pdf-timeout-running', + type=int, + default=1800, + help='Timeout in seconds for PDF jobs in "running" status (default: 1800 = 30 minutes)',) args = parser.parse_args() - + + # Validate mutually exclusive flags if args.agents_only and args.collections_only: parser.error('Cannot use both --agents-only and --collections-only') - arcflow = ArcFlow( arclight_dir=args.arclight_dir, aspace_dir=args.aspace_dir, solr_url=args.solr_url, + aspace_solr_url=args.aspace_solr_url, traject_extra_config=args.traject_extra_config, force_update=args.force_update, agents_only=args.agents_only, collections_only=args.collections_only, arcuit_dir=args.arcuit_dir, - skip_creator_indexing=args.skip_creator_indexing) + skip_creator_indexing=args.skip_creator_indexing, + pdf_timeout_queued=args.pdf_timeout_queued, + pdf_timeout_running=args.pdf_timeout_running) arcflow.run() diff --git a/traject_config_eac_cpf.rb b/example_traject_config_eac_cpf.rb similarity index 59% rename from traject_config_eac_cpf.rb rename to example_traject_config_eac_cpf.rb index 62c9a5a..52d0050 100644 --- a/traject_config_eac_cpf.rb +++ b/example_traject_config_eac_cpf.rb @@ -4,7 +4,9 @@ # Persons, and Families) XML documents from ArchivesSpace archival_contexts endpoint. # # Usage: -# bundle exec traject -u $SOLR_URL -c traject_config_eac_cpf.rb /path/to/agents/*.xml +# bundle exec traject -u $SOLR_URL -c example_traject_config_eac_cpf.rb /path/to/agents/*.xml +# +# For production, copy this file to your arcuit gem as traject_config_eac_cpf.rb # # The EAC-CPF XML documents are retrieved directly from ArchivesSpace via: # /repositories/{repo_id}/archival_contexts/{agent_type}/{id}.xml @@ -20,6 +22,12 @@ # EAC-CPF namespace - used consistently throughout this config EAC_NS = { 'eac' => 'urn:isbn:1-931666-33-4' } +# Pattern matching arcflow's creator file naming: creator_{entity_type}_{id} +CREATOR_ID_PATTERN = /^creator_(corporate_entities|people|families)_\d+$/ + +# Entity types - SINGLE SOURCE OF TRUTH +ENTITY_TYPES = ['corporate_entities', 'people', 'families'] + settings do provide "solr.url", ENV['SOLR_URL'] || "http://localhost:8983/solr/blacklight-core" provide "solr_writer.commit_on_close", "true" @@ -36,77 +44,21 @@ context.clipboard[:is_creator] = true end -# Core identity field -# CRITICAL: The 'id' field is required by Solr's schema (uniqueKey) -# Must ensure this field is never empty or indexing will fail -# -# IMPORTANT: Real EAC-CPF from ArchivesSpace has empty element! -# Cannot rely on recordId being present. Must extract from filename or generate. +# Solr uniqueKey - extract ID from filename using arcflow's creator_{entity_type}_{id} pattern to_field 'id' do |record, accumulator, context| - # Try 1: Extract from control/recordId (if present) - record_id = record.xpath('//eac:control/eac:recordId', EAC_NS).first - record_id ||= record.xpath('//control/recordId').first - - if record_id && !record_id.text.strip.empty? - accumulator << record_id.text.strip - else - # Try 2: Extract from source filename (most reliable for ArchivesSpace exports) - # Filename format: creator_corporate_entities_584.xml or similar - source_file = context.source_record_id || context.input_name - if source_file - # Remove .xml extension and any path - id_from_filename = File.basename(source_file, '.xml') - # Check if it looks valid (starts with creator_ or agent_) - if id_from_filename =~ /^(creator_|agent_)/ - accumulator << id_from_filename - context.logger.info("Using filename-based ID: #{id_from_filename}") - else - # Try 3: Generate from entity type and name - entity_type = record.xpath('//eac:cpfDescription/eac:identity/eac:entityType', EAC_NS).first&.text&.strip - name_entry = record.xpath('//eac:cpfDescription/eac:identity/eac:nameEntry/eac:part', EAC_NS).first&.text&.strip - - if entity_type && name_entry - # Create stable ID from type and name - type_short = case entity_type - when 'corporateBody' then 'corporate' - when 'person' then 'person' - when 'family' then 'family' - else 'entity' - end - name_id = name_entry.gsub(/[^a-z0-9]/i, '_').downcase[0..50] # Limit length - generated_id = "creator_#{type_short}_#{name_id}" - accumulator << generated_id - context.logger.warn("Generated ID from name: #{generated_id}") - else - # Last resort: timestamp-based unique ID - fallback_id = "creator_unknown_#{Time.now.to_i}_#{rand(10000)}" - accumulator << fallback_id - context.logger.error("Using fallback ID: #{fallback_id}") - end - end + source_file = context.source_record_id || context.input_name + if source_file + id_from_filename = File.basename(source_file, '.xml') + if id_from_filename =~ CREATOR_ID_PATTERN + accumulator << id_from_filename + context.logger.info("Using filename-based ID: #{id_from_filename}") else - # No filename available, generate from name - entity_type = record.xpath('//eac:cpfDescription/eac:identity/eac:entityType', EAC_NS).first&.text&.strip - name_entry = record.xpath('//eac:cpfDescription/eac:identity/eac:nameEntry/eac:part', EAC_NS).first&.text&.strip - - if entity_type && name_entry - type_short = case entity_type - when 'corporateBody' then 'corporate' - when 'person' then 'person' - when 'family' then 'family' - else 'entity' - end - name_id = name_entry.gsub(/[^a-z0-9]/i, '_').downcase[0..50] - generated_id = "creator_#{type_short}_#{name_id}" - accumulator << generated_id - context.logger.warn("Generated ID from name: #{generated_id}") - else - # Absolute last resort - fallback_id = "creator_unknown_#{Time.now.to_i}_#{rand(10000)}" - accumulator << fallback_id - context.logger.error("Using fallback ID: #{fallback_id}") - end + context.logger.error("Filename doesn't match expected pattern 'creator_{type}_{id}': #{id_from_filename}") + context.skip!("Invalid ID format in filename") end + else + context.logger.error("No source filename available for record") + context.skip!("Missing source filename") end end @@ -115,13 +67,13 @@ accumulator << 'true' end -# Record type -to_field 'record_type' do |record, accumulator| - accumulator << 'creator' -end +# # Record type +# to_field 'record_type' do |record, accumulator| +# accumulator << 'creator' +# end # Entity type (corporateBody, person, family) -to_field 'entity_type' do |record, accumulator| +to_field 'entity_type_ssi' do |record, accumulator| entity = record.xpath('//eac:cpfDescription/eac:identity/eac:entityType', EAC_NS).first accumulator << entity.text if entity end @@ -188,7 +140,8 @@ # Extract HTML for searchable content (matches ArcLight's bioghist_html_tesm) bioghist = record.xpath('//eac:cpfDescription/eac:description/eac:biogHist//eac:p', EAC_NS) if bioghist.any? - html = bioghist.map { |p| "

#{p.text}

" }.join("\n") + # Preserve inline EAC markup inside by serializing child nodes + html = bioghist.map { |p| "

#{p.inner_html}

" }.join("\n") accumulator << html end end @@ -210,26 +163,25 @@ accumulator << bioghist.map(&:text).join(' ') if bioghist.any? end -# Related agents (from cpfRelation elements) -to_field 'related_agents_ssim' do |record, accumulator| +# Related agents (from cpfRelation elements) for display parsing and debugging, stored as a single line +# "https://archivesspace-stage.library.illinois.edu/agents/corporate_entities/57|associative" +to_field 'related_agents_debug_ssim' do |record, accumulator| relations = record.xpath('//eac:cpfDescription/eac:relations/eac:cpfRelation', EAC_NS) relations.each do |rel| - # Get the related entity href/identifier href = rel['href'] || rel['xlink:href'] relation_type = rel['cpfRelationType'] - + if href - # Store as: "uri|type" for easy parsing later - accumulator << "#{href}|#{relation_type}" - elsif relation_entry = rel.xpath('eac:relationEntry', EAC_NS).first - # If no href, at least store the name - name = relation_entry.text - accumulator << "#{name}|#{relation_type}" if name + solr_id = aspace_uri_to_solr_id(href) + if solr_id + # Format: "solr_id|type" + accumulator << "#{solr_id}|#{relation_type || 'unknown'}" + end end end end -# Related agents - just URIs (for simpler queries) +# Related agents - ASpace URIs, in parallel array to match ids and types to_field 'related_agent_uris_ssim' do |record, accumulator| relations = record.xpath('//eac:cpfDescription/eac:relations/eac:cpfRelation', EAC_NS) relations.each do |rel| @@ -238,7 +190,31 @@ end end -# Relationship types +# Related agents - Parallel array of relationship ids to match relationship types and uris +to_field 'related_agent_ids_ssim' do |record, accumulator| + relations = record.xpath('//eac:cpfDescription/eac:relations/eac:cpfRelation', EAC_NS) + relations.each do |rel| + href = rel['href'] || rel['xlink:href'] + if href + solr_id = aspace_uri_to_solr_id(href) # CONVERT URI TO ID + accumulator << solr_id if solr_id + end + end +end + +# Related Agents - Parallel array of relationship types to match relationship ids and uris +to_field 'related_agent_relationship_types_ssim' do |record, accumulator| + relations = record.xpath('//eac:cpfDescription/eac:relations/eac:cpfRelation', EAC_NS) + relations.each do |rel| + href = rel['href'] || rel['xlink:href'] + if href + relation_type = rel['cpfRelationType'] || 'unknown' + accumulator << relation_type # NO deduplication - keeps array parallel + end + end +end + +# Relationship types used for faceting, to_field 'relationship_types_ssim' do |record, accumulator| relations = record.xpath('//eac:cpfDescription/eac:relations/eac:cpfRelation', EAC_NS) relations.each do |rel| @@ -248,7 +224,7 @@ end # Agent source URI (from original ArchivesSpace) -to_field 'agent_uri' do |record, accumulator| +to_field 'agent_uri_ssi' do |record, accumulator| # Try to extract from control section or otherRecordId other_id = record.xpath('//eac:control/eac:otherRecordId[@localType="archivesspace_uri"]', EAC_NS).first if other_id @@ -261,10 +237,10 @@ accumulator << Time.now.utc.iso8601 end -# Document type marker -to_field 'document_type' do |record, accumulator| - accumulator << 'creator' -end +# # Document type marker +# to_field 'document_type' do |record, accumulator| +# accumulator << 'creator' +# end # Log successful indexing each_record do |record, context| @@ -273,3 +249,27 @@ context.logger.info("Indexed creator: #{record_id.text}") end end + + + + +# Pattern matching arcflow's creator file naming: creator_{entity_type}_{id} +CREATOR_ID_PATTERN = /^creator_(#{ENTITY_TYPES.join('|')})_\d+$/ + +# Helper to build and validate creator IDs +def build_creator_id(entity_type, id_number) + creator_id = "creator_#{entity_type}_#{id_number}" + unless creator_id =~ CREATOR_ID_PATTERN + raise ArgumentError, "Invalid creator ID: #{creator_id} doesn't match pattern" + end + creator_id +end + +# Helper to convert ArchivesSpace URI to Solr creator ID +def aspace_uri_to_solr_id(uri) + return nil unless uri + # Match: /agents/{type}/{id} or https://.../agents/{type}/{id} + if uri =~ /agents\/(#{ENTITY_TYPES.join('|')})\/(\d+)/ + build_creator_id($1, $2) + end +end \ No newline at end of file