Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,10 @@ build/*

# AI Agent files
AGENTS.md
CLAUDE.md

# Provenance related
provenance.yml
provenance_graph.yml
provenance.svg
*.dot
21 changes: 21 additions & 0 deletions arc/job/pipe/pipe_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,16 @@ def ingest_pipe_results(self, pipe: PipeRun) -> None:
if state.status == TaskState.COMPLETED.value:
ingest_completed_task(pipe.run_id, pipe.pipe_root, spec, state,
self.sched.species_dict, self.sched.output)
self._update_graph_for_pipe_task(spec, status='done')
elif state.status == TaskState.FAILED_ESS.value:
self._eject_to_scheduler(pipe, spec, state)
self._update_graph_for_pipe_task(spec, status='errored')
ejected_count += 1
elif state.status == TaskState.FAILED_TERMINAL.value:
logger.error(f'Pipe run {pipe.run_id}, task {spec.task_id}: '
f'failed terminally (failure_class={state.failure_class}). '
f'Manual troubleshooting required.')
self._update_graph_for_pipe_task(spec, status='errored')
elif state.status == TaskState.CANCELLED.value:
logger.warning(f'Pipe run {pipe.run_id}, task {spec.task_id}: '
f'was cancelled.')
Expand All @@ -290,6 +293,24 @@ def ingest_pipe_results(self, pipe: PipeRun) -> None:
else:
self._post_ingest_pipe_run(pipe)

def _update_graph_for_pipe_task(self, spec: TaskSpec, status: str) -> None:
"""Update the provenance graph calc node for a completed/failed pipe task."""
graph = getattr(self.sched, 'graph', None)
if graph is None:
return
label = spec.owner_key
meta = spec.ingestion_metadata or {}
job_type = TASK_FAMILY_TO_JOB_TYPE.get(spec.task_family, spec.task_family)
# Build the job_name the scheduler would have used for this task.
conf_idx = meta.get('conformer_index')
if conf_idx is not None:
job_name = f'{job_type}_{conf_idx}'
else:
job_name = spec.task_id # fallback to pipe task_id
calc_nid = graph.find_calc_node(label, job_name)
if calc_nid is not None:
graph.update_node(calc_nid, status=status)

def _post_ingest_pipe_run(self, pipe: PipeRun) -> None:
"""
Trigger family-specific post-processing after all tasks in a pipe run
Expand Down
360 changes: 360 additions & 0 deletions arc/plotter.py

Large diffs are not rendered by default.

138 changes: 138 additions & 0 deletions arc/plotter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
import shutil
import unittest

try:
import graphviz
except ImportError:
graphviz = None

import arc.plotter as plotter
from arc.common import ARC_PATH, ARC_TESTING_PATH, read_yaml_file, safe_copy_file
from arc.species.converter import str_to_xyz
Expand Down Expand Up @@ -218,6 +223,139 @@ def test_save_irc_traj_animation(self):
plotter.save_irc_traj_animation(irc_f_path, irc_r_path, out_path)
self.assertTrue(os.path.isfile(out_path))

def test_wrap_graph_label(self):
"""Test that _wrap_graph_label preserves intentional newlines."""
# Intentional newlines should be preserved, not collapsed.
result = plotter._wrap_graph_label("opt\nopt_a1\ngaussian\nwb97xd/def2tzvp", width=30)
lines = result.split('\n')
self.assertEqual(lines[0], 'opt')
self.assertEqual(lines[1], 'opt_a1')
self.assertEqual(lines[2], 'gaussian')
self.assertEqual(lines[3], 'wb97xd/def2tzvp')
# Long single lines should still be wrapped.
result = plotter._wrap_graph_label("this is a very long label that should be wrapped", width=20)
self.assertTrue(all(len(line) <= 20 for line in result.split('\n')))
# Empty string returns empty.
self.assertEqual(plotter._wrap_graph_label(''), '')

def test_save_provenance_artifacts(self):
"""Test saving ARC provenance YAML / Graphviz artifacts."""
project = 'arc_project_for_testing_delete_after_usage'
project_directory = os.path.join(ARC_PATH, 'Projects', project)
provenance = {
'project': project,
'run_id': 'run_1',
'started_at': '2026-03-15T10:00:00',
'ended_at': '2026-03-15T10:05:00',
'events': [
{'event_id': 1, 'event_type': 'species_initialized', 'timestamp': '2026-03-15T10:00:00',
'label': 'spc1'},
{'event_id': 2, 'event_type': 'species_initialized', 'timestamp': '2026-03-15T10:00:00',
'label': 'TS0', 'is_ts': True},
{'event_id': 3, 'event_type': 'job_started', 'timestamp': '2026-03-15T10:00:01',
'label': 'spc1', 'job_key': 'spc1:opt_a1', 'job_name': 'opt_a1', 'job_type': 'opt',
'job_adapter': 'gaussian', 'level': 'b3lyp/6-31g(d)'},
{'event_id': 4, 'event_type': 'job_finished', 'timestamp': '2026-03-15T10:01:00',
'label': 'spc1', 'job_key': 'spc1:opt_a1', 'status': 'done', 'run_time': '0:01:00'},
{'event_id': 5, 'event_type': 'job_started', 'timestamp': '2026-03-15T10:01:01',
'label': 'spc1', 'job_key': 'spc1:freq_a2', 'job_name': 'freq_a2', 'job_type': 'freq',
'job_adapter': 'gaussian', 'level': 'b3lyp/6-31g(d)'},
{'event_id': 6, 'event_type': 'job_finished', 'timestamp': '2026-03-15T10:01:30',
'label': 'spc1', 'job_key': 'spc1:freq_a2', 'status': 'errored',
'run_time': '0:00:30', 'keywords': ['memory']},
{'event_id': 7, 'event_type': 'job_troubleshooting', 'timestamp': '2026-03-15T10:01:35',
'label': 'spc1', 'job_key': 'spc1:freq_a2', 'job_name': 'freq_a2', 'job_type': 'freq',
'methods': ['memory']},
{'event_id': 8, 'event_type': 'job_started', 'timestamp': '2026-03-15T10:01:40',
'label': 'spc1', 'job_key': 'spc1:freq_a3', 'job_name': 'freq_a3', 'job_type': 'freq',
'job_adapter': 'gaussian', 'provenance_parent_job': 'freq_a2',
'provenance_reason': 'ess_troubleshoot'},
{'event_id': 9, 'event_type': 'job_finished', 'timestamp': '2026-03-15T10:02:00',
'label': 'spc1', 'job_key': 'spc1:freq_a3', 'status': 'done', 'run_time': '0:00:20'},
{'event_id': 10, 'event_type': 'job_started', 'timestamp': '2026-03-15T10:02:01',
'label': 'TS0', 'job_key': 'TS0:tsg0', 'job_name': 'tsg0', 'job_type': 'tsg',
'job_adapter': 'autotst'},
{'event_id': 11, 'event_type': 'job_finished', 'timestamp': '2026-03-15T10:03:00',
'label': 'TS0', 'job_key': 'TS0:tsg0', 'status': 'done'},
{'event_id': 12, 'event_type': 'ts_guess_selected', 'timestamp': '2026-03-15T10:03:01',
'label': 'TS0', 'selected_index': 0, 'method': 'autotst', 'energy': -154.321},
],
}
paths = plotter.save_provenance_artifacts(project_directory=project_directory, provenance=provenance)
self.assertTrue(os.path.isfile(paths['yml']))
if paths['dot'] is not None:
self.assertTrue(os.path.isfile(paths['dot']))
with open(paths['dot'], 'r') as f:
dot = f.read()
# Species and job nodes are present.
self.assertIn('spc1', dot)
self.assertIn('opt_a1', dot)
self.assertIn('TS0', dot)
# Troubleshoot diamond and edge label rendered.
self.assertIn('Troubleshoot', dot)
self.assertIn('ess_troubleshoot', dot)
# TS guess selection diamond rendered.
self.assertIn('Select TS guess 0', dot)
self.assertIn('autotst', dot)
# Errored job node coloured correctly.
self.assertIn('mistyrose', dot)
# Normal jobs (opt_a1, freq_a2) connect from the species node, not from each other.
self.assertIn('species_spc1 -> job_spc1_opt_a1', dot)
self.assertIn('species_spc1 -> job_spc1_freq_a2', dot)
# Troubleshoot follow-up connects from the decision diamond, not the species node.
self.assertIn('decision_7 -> job_spc1_freq_a3', dot)

def test_render_provenance_graph(self):
"""Test Graphviz rendering from a ProvenanceGraph object."""
from arc.provenance import (ProvenanceGraph, DecisionKind, DataKind, EdgeType)
g = ProvenanceGraph(project='render_test')
sid = g.add_species_node(label='ethanol')
cid = g.add_calculation_node(label='ethanol', job_name='opt_a1',
job_type='opt', job_adapter='gaussian',
level='b3lyp/6-31g(d)', status='done')
did = g.add_data_node(label='ethanol', data_kind=DataKind.energy, value=-79.5)
dec = g.add_decision_node(label='ethanol',
decision_kind=DecisionKind.conformer_selection,
outcome='Selected conformer #0')
g.add_edge(sid, cid, EdgeType.input_of)
g.add_edge(cid, did, EdgeType.output_of)
g.add_edge(did, dec, EdgeType.selected_by)

if graphviz is not None:
gv = plotter.render_provenance_graph(g, run_label='render_test')
dot_source = gv.source
self.assertIn('ethanol', dot_source)
self.assertIn('opt', dot_source)
self.assertIn('energy', dot_source)
self.assertIn('conformer selection', dot_source)
self.assertIn('honeydew', dot_source) # done calc
self.assertIn('cornsilk', dot_source) # data node
self.assertIn('diamond', dot_source) # decision node
self.assertIn('green3', dot_source) # selected_by edge

def test_save_provenance_artifacts_with_graph(self):
"""Test that save_provenance_artifacts prefers graph-based rendering when a graph is provided."""
from arc.provenance import (ProvenanceGraph, DecisionKind, EdgeType)
project = 'arc_project_for_testing_delete_after_usage'
project_directory = os.path.join(ARC_PATH, 'Projects', project)
g = ProvenanceGraph(project=project)
sid = g.add_species_node(label='spc1')
cid = g.add_calculation_node(label='spc1', job_name='opt_a1',
job_type='opt', status='done')
g.add_edge(sid, cid, EdgeType.input_of)
provenance = {'project': project, 'events': []}
paths = plotter.save_provenance_artifacts(
project_directory=project_directory,
provenance=provenance,
graph=g,
)
self.assertTrue(os.path.isfile(paths['yml']))
if paths['dot'] is not None:
with open(paths['dot'], 'r') as f:
dot = f.read()
# Graph-based rendering uses node IDs like species_1 not event-based species_spc1.
self.assertIn('species_1', dot)
self.assertIn('honeydew', dot)

@classmethod
def tearDownClass(cls):
Expand Down
38 changes: 38 additions & 0 deletions arc/provenance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
ARC provenance subpackage — directed acyclic graph for computational provenance.

Tracks the full chain of inputs, calculations, decisions, and outputs that
produce ARC's results. Inspired by AiiDA's DAG model but adapted for ARC's
branching decision trees (TS guess evaluation, conformer selection,
troubleshooting loops).

Submodules:
- ``nodes``: Node types, edge types, and their data classes.
- ``graph``: ProvenanceGraph container with query and serialization.
"""

from arc.provenance.graph import ProvenanceGraph
from arc.provenance.nodes import (
CalculationNode,
DataKind,
DataNode,
DecisionKind,
DecisionNode,
EdgeType,
NodeType,
ProvenanceEdge,
ProvenanceNode,
)

__all__ = [
'ProvenanceGraph',
'ProvenanceNode',
'CalculationNode',
'DataNode',
'DecisionNode',
'ProvenanceEdge',
'NodeType',
'DataKind',
'DecisionKind',
'EdgeType',
]
Loading
Loading