From 19991a3bdc5eaf06829c414c30b17be6b8acb391 Mon Sep 17 00:00:00 2001 From: richard gowers Date: Mon, 1 Jan 2024 18:12:59 +0000 Subject: [PATCH 1/6] WIP of new storage backend in RFE protocol --- .../protocols/openmm_rfe/equil_rfe_methods.py | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/openfe/protocols/openmm_rfe/equil_rfe_methods.py b/openfe/protocols/openmm_rfe/equil_rfe_methods.py index 97035386e..b534e4ad1 100644 --- a/openfe/protocols/openmm_rfe/equil_rfe_methods.py +++ b/openfe/protocols/openmm_rfe/equil_rfe_methods.py @@ -578,8 +578,9 @@ def __init__(self, *, ) def run(self, *, dry=False, verbose=True, - scratch_basepath=None, - shared_basepath=None) -> dict[str, Any]: + scratch_basepath, + shared_basepath, + permanent_basepath) -> dict[str, Any]: """Run the relative free energy calculation. Parameters @@ -591,10 +592,12 @@ def run(self, *, dry=False, verbose=True, verbose : bool Verbose output of the simulation progress. Output is provided via INFO level logging. - scratch_basepath: Pathlike, optional - Where to store temporary files, defaults to current working directory - shared_basepath : Pathlike, optional - Where to run the calculation, defaults to current working directory + scratch_basepath: StagingDirectory + Where to store temporary files + shared_basepath : StagingDirectory + Where to run the calculation + permanent_basepath : StagingDirectory + Where to store files that must persist beyond the DAG Returns ------- @@ -609,11 +612,6 @@ def run(self, *, dry=False, verbose=True, """ if verbose: self.logger.info("Preparing the hybrid topology simulation") - if scratch_basepath is None: - scratch_basepath = pathlib.Path('.') - if shared_basepath is None: - # use cwd - shared_basepath = pathlib.Path('.') # 0. General setup and settings dependency resolution step @@ -664,11 +662,13 @@ def run(self, *, dry=False, verbose=True, else: ffcache = None + ffcache.register() + system_generator = system_creation.get_system_generator( forcefield_settings=forcefield_settings, thermo_settings=thermo_settings, system_settings=system_settings, - cache=ffcache, + cache=ffcache.fspath, has_solvent=solvent_comp is not None, ) @@ -812,7 +812,7 @@ def run(self, *, dry=False, verbose=True, ) # a. Create the multistate reporter - nc = shared_basepath / sim_settings.output_filename + nc = (shared_basepath / sim_settings.output_filename).fspath chk = sim_settings.checkpoint_storage reporter = multistate.MultiStateReporter( storage=nc, @@ -947,7 +947,7 @@ def run(self, *, dry=False, verbose=True, sampling_method=sampler_settings.sampler_method.lower(), result_units=unit.kilocalorie_per_mole, ) - analyzer.plot(filepath=shared_basepath, filename_prefix="") + analyzer.plot(filepath=permanent_basepath, filename_prefix="") analyzer.close() else: @@ -1020,7 +1020,8 @@ def _execute( log_system_probe(logging.INFO, paths=[ctx.scratch]) with without_oechem_backend(): outputs = self.run(scratch_basepath=ctx.scratch, - shared_basepath=ctx.shared) + shared_basepath=ctx.shared, + permanent_basepath=ctx.permanent) analysis_outputs = self.analyse(ctx.shared) From 6b81bb65d0739559a19fbfdebbad221c8fa10fcc Mon Sep 17 00:00:00 2001 From: richard gowers Date: Thu, 4 Jan 2024 18:57:39 +0000 Subject: [PATCH 2/6] register files I'm expecting to be created --- openfe/protocols/openmm_rfe/equil_rfe_methods.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/openfe/protocols/openmm_rfe/equil_rfe_methods.py b/openfe/protocols/openmm_rfe/equil_rfe_methods.py index b534e4ad1..42108adc9 100644 --- a/openfe/protocols/openmm_rfe/equil_rfe_methods.py +++ b/openfe/protocols/openmm_rfe/equil_rfe_methods.py @@ -812,10 +812,17 @@ def run(self, *, dry=False, verbose=True, ) # a. Create the multistate reporter - nc = (shared_basepath / sim_settings.output_filename).fspath + nc = (shared_basepath / sim_settings.output_filename) + checkpoint = (shared_basepath / "checkpoint.nc") + real_time_analysis = (shared_basepath / "real_time_analysis.yaml") + # have to flag these files as being created so that they get brought back + nc.register() + checkpoint.register() + real_time_analysis.register() + chk = sim_settings.checkpoint_storage reporter = multistate.MultiStateReporter( - storage=nc, + storage=nc.fspath, analysis_particle_indices=selection_indices, checkpoint_interval=sim_settings.checkpoint_interval.m, checkpoint_storage=chk, @@ -991,6 +998,8 @@ def run(self, *, dry=False, verbose=True, def analyse(where) -> dict: # don't put energy analysis in here, it uses the open file reporter # whereas structural stuff requires that the file handle is closed + where = where.fspath + ret = subprocess.run(['openfe_analysis', str(where)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) From 0571d282543d9d572dff08f8c30d53ebb2535a4a Mon Sep 17 00:00:00 2001 From: richard gowers Date: Tue, 9 Jan 2024 14:47:19 +0000 Subject: [PATCH 3/6] more WIP of new filesystem access currently fails with: UnsupportedOperation: not readable --- .../protocols/openmm_rfe/equil_rfe_methods.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/openfe/protocols/openmm_rfe/equil_rfe_methods.py b/openfe/protocols/openmm_rfe/equil_rfe_methods.py index 42108adc9..47ee2c27e 100644 --- a/openfe/protocols/openmm_rfe/equil_rfe_methods.py +++ b/openfe/protocols/openmm_rfe/equil_rfe_methods.py @@ -812,8 +812,9 @@ def run(self, *, dry=False, verbose=True, ) # a. Create the multistate reporter + # TODO: Logic about keeping/not .nc files goes here nc = (shared_basepath / sim_settings.output_filename) - checkpoint = (shared_basepath / "checkpoint.nc") + checkpoint = (shared_basepath / sim_settings.checkpoint_storage) real_time_analysis = (shared_basepath / "real_time_analysis.yaml") # have to flag these files as being created so that they get brought back nc.register() @@ -959,8 +960,7 @@ def run(self, *, dry=False, verbose=True, else: # clean up the reporter file - fns = [shared_basepath / sim_settings.output_filename, - shared_basepath / sim_settings.checkpoint_storage] + fns = [nc.fspath, checkpoint.fspath] for fn in fns: os.remove(fn) finally: @@ -987,8 +987,8 @@ def run(self, *, dry=False, verbose=True, if not dry: # pragma: no-cover return { - 'nc': nc, - 'last_checkpoint': chk, + 'nc': nc.fspath, + 'last_checkpoint': checkpoint.fspath, **analyzer.unit_results_dict } else: @@ -998,27 +998,27 @@ def run(self, *, dry=False, verbose=True, def analyse(where) -> dict: # don't put energy analysis in here, it uses the open file reporter # whereas structural stuff requires that the file handle is closed - where = where.fspath - - ret = subprocess.run(['openfe_analysis', str(where)], + output = (where / 'results.json') + ret = subprocess.run(['openfe_analysis', 'RFE_analysis', + where.fspath, output], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if ret.returncode: return {'structural_analysis_error': ret.stderr} - data = json.loads(ret.stdout) + with open(output, 'w') as f: + data = json.load(f) - savedir = pathlib.Path(where) if d := data['protein_2D_RMSD']: fig = plotting.plot_2D_rmsd(d) - fig.savefig(savedir / "protein_2D_RMSD.png") + fig.savefig(where / "protein_2D_RMSD.png") plt.close(fig) f2 = plotting.plot_ligand_COM_drift(data['time(ps)'], data['ligand_wander']) - f2.savefig(savedir / "ligand_COM_drift.png") + f2.savefig(where / "ligand_COM_drift.png") plt.close(f2) f3 = plotting.plot_ligand_RMSD(data['time(ps)'], data['ligand_RMSD']) - f3.savefig(savedir / "ligand_RMSD.png") + f3.savefig(where / "ligand_RMSD.png") plt.close(f3) return {'structural_analysis': data} From ecd6716df737d79bf238436b06ef0bdfe95c6268 Mon Sep 17 00:00:00 2001 From: richard gowers Date: Wed, 10 Jan 2024 14:11:12 +0000 Subject: [PATCH 4/6] more WIP of new filesystem access --- .../protocols/openmm_rfe/equil_rfe_methods.py | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/openfe/protocols/openmm_rfe/equil_rfe_methods.py b/openfe/protocols/openmm_rfe/equil_rfe_methods.py index 47ee2c27e..7ac3690ff 100644 --- a/openfe/protocols/openmm_rfe/equil_rfe_methods.py +++ b/openfe/protocols/openmm_rfe/equil_rfe_methods.py @@ -50,6 +50,7 @@ settings, ChemicalSystem, LigandAtomMapping, Component, ComponentMapping, SmallMoleculeComponent, ProteinComponent, SolventComponent, ) +from gufe.storage import stagingregistry from .equil_rfe_settings import ( RelativeHybridTopologyProtocolSettings, SystemSettings, @@ -578,9 +579,10 @@ def __init__(self, *, ) def run(self, *, dry=False, verbose=True, - scratch_basepath, - shared_basepath, - permanent_basepath) -> dict[str, Any]: + scratch_basepath: pathlib.Path, + shared_basepath: stagingregistry.StagingRegistry, + permanent_basepath: stagingregistry.StagingRegistry, + ) -> dict[str, Any]: """Run the relative free energy calculation. Parameters @@ -668,7 +670,7 @@ def run(self, *, dry=False, verbose=True, forcefield_settings=forcefield_settings, thermo_settings=thermo_settings, system_settings=system_settings, - cache=ffcache.fspath, + cache=ffcache.as_path(), has_solvent=solvent_comp is not None, ) @@ -823,7 +825,7 @@ def run(self, *, dry=False, verbose=True, chk = sim_settings.checkpoint_storage reporter = multistate.MultiStateReporter( - storage=nc.fspath, + storage=str(nc.as_path()), analysis_particle_indices=selection_indices, checkpoint_interval=sim_settings.checkpoint_interval.m, checkpoint_storage=chk, @@ -960,7 +962,7 @@ def run(self, *, dry=False, verbose=True, else: # clean up the reporter file - fns = [nc.fspath, checkpoint.fspath] + fns = [nc.as_path(), checkpoint.as_path()] for fn in fns: os.remove(fn) finally: @@ -987,26 +989,28 @@ def run(self, *, dry=False, verbose=True, if not dry: # pragma: no-cover return { - 'nc': nc.fspath, - 'last_checkpoint': checkpoint.fspath, + 'nc': nc.as_path(), + 'last_checkpoint': checkpoint.as_path(), **analyzer.unit_results_dict } else: return {'debug': {'sampler': sampler}} @staticmethod - def analyse(where) -> dict: + def analyse(where: stagingregistry.StagingRegistry) -> dict: # don't put energy analysis in here, it uses the open file reporter # whereas structural stuff requires that the file handle is closed + trjdir = (where / '') output = (where / 'results.json') ret = subprocess.run(['openfe_analysis', 'RFE_analysis', - where.fspath, output], + str(trjdir.as_path()), + str(output.as_path())], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if ret.returncode: return {'structural_analysis_error': ret.stderr} - with open(output, 'w') as f: + with open(output, 'r') as f: data = json.load(f) if d := data['protein_2D_RMSD']: From 37d448c3eba0564fea9ac565b5edcc2049e8e7f3 Mon Sep 17 00:00:00 2001 From: richard gowers Date: Mon, 15 Jan 2024 14:34:44 +0000 Subject: [PATCH 5/6] pass StagingPath objects back in Unit return --- openfe/protocols/openmm_rfe/equil_rfe_methods.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openfe/protocols/openmm_rfe/equil_rfe_methods.py b/openfe/protocols/openmm_rfe/equil_rfe_methods.py index 7ac3690ff..b96054959 100644 --- a/openfe/protocols/openmm_rfe/equil_rfe_methods.py +++ b/openfe/protocols/openmm_rfe/equil_rfe_methods.py @@ -989,8 +989,8 @@ def run(self, *, dry=False, verbose=True, if not dry: # pragma: no-cover return { - 'nc': nc.as_path(), - 'last_checkpoint': checkpoint.as_path(), + 'nc': nc, + 'last_checkpoint': checkpoint, **analyzer.unit_results_dict } else: From fb85a58e66ea3eba4bbbf17e4adf87bb3ba37ca7 Mon Sep 17 00:00:00 2001 From: richard gowers Date: Mon, 22 Jan 2024 14:33:42 +0000 Subject: [PATCH 6/6] fixed StagingPath type annotations --- openfe/protocols/openmm_rfe/equil_rfe_methods.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/openfe/protocols/openmm_rfe/equil_rfe_methods.py b/openfe/protocols/openmm_rfe/equil_rfe_methods.py index b96054959..c494e7e6b 100644 --- a/openfe/protocols/openmm_rfe/equil_rfe_methods.py +++ b/openfe/protocols/openmm_rfe/equil_rfe_methods.py @@ -580,8 +580,8 @@ def __init__(self, *, def run(self, *, dry=False, verbose=True, scratch_basepath: pathlib.Path, - shared_basepath: stagingregistry.StagingRegistry, - permanent_basepath: stagingregistry.StagingRegistry, + shared_basepath: stagingregistry.StagingPath, + permanent_basepath: stagingregistry.StagingPath, ) -> dict[str, Any]: """Run the relative free energy calculation. @@ -594,11 +594,11 @@ def run(self, *, dry=False, verbose=True, verbose : bool Verbose output of the simulation progress. Output is provided via INFO level logging. - scratch_basepath: StagingDirectory + scratch_basepath: pathlib.Path Where to store temporary files - shared_basepath : StagingDirectory + shared_basepath : StagingPath Where to run the calculation - permanent_basepath : StagingDirectory + permanent_basepath : StagingPath Where to store files that must persist beyond the DAG Returns @@ -997,13 +997,12 @@ def run(self, *, dry=False, verbose=True, return {'debug': {'sampler': sampler}} @staticmethod - def analyse(where: stagingregistry.StagingRegistry) -> dict: + def analyse(where: stagingregistry.StagingPath) -> dict: # don't put energy analysis in here, it uses the open file reporter # whereas structural stuff requires that the file handle is closed - trjdir = (where / '') output = (where / 'results.json') ret = subprocess.run(['openfe_analysis', 'RFE_analysis', - str(trjdir.as_path()), + str(where.as_path()), str(output.as_path())], stdout=subprocess.PIPE, stderr=subprocess.PIPE)