Skip to content

Commit 8977f6d

Browse files
Merge pull request #61 from SSCHAcode/new_cluster_interface
New cluster interface
2 parents d1092d2 + c21459a commit 8977f6d

3 files changed

Lines changed: 155 additions & 7 deletions

File tree

Modules/Cluster.py

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
pass
1414

1515
import numpy as np
16+
import time, datetime
1617

1718
from ase.units import Rydberg, Bohr
1819
import ase, ase.io
@@ -178,6 +179,10 @@ def __init__(self, hostname=None, pwd=None, extra_options="", workdir = "",
178179
self.timeout = 1000
179180
self.use_timeout = False
180181

182+
# Check the status of the job every TOT seconds
183+
self.check_timeout = 300
184+
self.nonblocking_command = False # True if you use a different version of slurm that does not accept blocking commands
185+
181186
# This is the number of configurations to be computed for each jub submitted
182187
# This times the self.batch_size is the total amount of configurations submitted toghether
183188
self.job_number = 1
@@ -275,7 +280,7 @@ def __setattr__(self, name, value):
275280

276281

277282

278-
def ExecuteCMD(self, cmd, raise_error = True, return_output = False):
283+
def ExecuteCMD(self, cmd, raise_error = True, return_output = False, on_cluster = False):
279284
"""
280285
EXECUTE THE CMD ON THE CLUSTER
281286
==============================
@@ -292,6 +297,8 @@ def ExecuteCMD(self, cmd, raise_error = True, return_output = False):
292297
return_output : bool, optional
293298
If True (default False) the output of the command is
294299
returned as second value.
300+
on_cluster : bool
301+
If true, the command is executed directly on the cluster through ssh
295302
296303
Returns
297304
-------
@@ -301,6 +308,9 @@ def ExecuteCMD(self, cmd, raise_error = True, return_output = False):
301308
output : string
302309
Returned only if return_output is True
303310
"""
311+
312+
if on_cluster:
313+
cmd = self.sshcmd + " {} '{}'".format(self.hostname, cmd)
304314

305315
success = False
306316
output = ""
@@ -333,6 +343,8 @@ def ExecuteCMD(self, cmd, raise_error = True, return_output = False):
333343
if return_output:
334344
return success, output
335345
return success
346+
347+
336348

337349

338350
def set_timeout(self, timeout):
@@ -520,10 +532,14 @@ def batch_submission(self, list_of_structures, calc, indices,
520532

521533
# Append the additional parameters
522534
for add_parameter in self.custom_params:
535+
adder_string = "--{}".format(add_parameter)
536+
if add_parameter.startswith("-"):
537+
adder_string = add_parameter
538+
523539
if self.custom_params[add_parameter] is None:
524-
submission += "#{} --{}\n".format(self.submit_name, add_parameter)
540+
submission += "#{} {}\n".format(self.submit_name, adder_string)
525541
else:
526-
submission += "#{} --{}={}\n".format(self.submit_name, add_parameter, self.custom_params[add_parameter])
542+
submission += "#{} {}={}\n".format(self.submit_name, adder_string, self.custom_params[add_parameter])
527543

528544

529545
# Add the set -x option
@@ -561,14 +577,27 @@ def batch_submission(self, list_of_structures, calc, indices,
561577
if not cp_res:
562578
print ("Error while executing:", cmd)
563579
print ("Return code:", cp_res)
564-
sys.stderr.write(cmd + ": exit with code " + str(cp_res))
580+
sys.stderr.write(cmd + ": exit with code " + str(cp_res) + "\n")
565581
return results#[None] * N_structs
566582

567583

568584
# Run the simulation
569585
cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command,
570586
self.workdir, label+ "_" + str(indices[0]))
571-
cp_res = self.ExecuteCMD(cmd, False)
587+
status, submission_output = self.ExecuteCMD(cmd, True, return_output = True)
588+
589+
# If the command for submission is non blocking, we need to check periodically wether the calculation has been completed
590+
if self.nonblocking_command:
591+
job_id = self.get_job_id_from_submission_output(submission_output)
592+
593+
now = datetime.datetime.now()
594+
sys.stderr.write("{}/{}/{} - {}:{} | submitted job id {} ({})\n".format(now.year, now.month, now.day, now.hour, now.minute, now.second, job_id, submission_output))
595+
sys.stderr.flush()
596+
time.sleep(self.check_timeout)
597+
598+
while not self.check_job_finished(job_id):
599+
time.sleep(self.check_timeout)
600+
572601
# cp_res = os.system(cmd + " > /dev/null")
573602
# if cp_res != 0:
574603
# print "Error while executing:", cmd
@@ -602,7 +631,58 @@ def batch_submission(self, list_of_structures, calc, indices,
602631
pass
603632

604633
return results
605-
634+
635+
def get_job_id_from_submission_output(self, output):
636+
"""
637+
GET THE JOB ID
638+
639+
Retreive the job id from the output of the submission.
640+
This depends on the software employed. It works for slurm.
641+
642+
Returns None if the output contains an error
643+
"""
644+
645+
try:
646+
id = output.split()[-1]
647+
return id
648+
except:
649+
print("Error, expected a standard output, but the result of the submission was: {}".format(output))
650+
return None
651+
652+
def check_job_finished(self, job_id, verbose = True):
653+
"""
654+
Check if the job identified by the job_id is finished
655+
656+
Parameters
657+
----------
658+
job_id : string
659+
The string that identifies uniquely the job
660+
"""
661+
662+
status, output = self.ExecuteCMD("squeue -u $USER", False, return_output = True, on_cluster = True, )
663+
lines = output.split("\n")
664+
if len(lines):
665+
for l in lines:
666+
data = l.strip().split()
667+
if data[0] == job_id:
668+
if verbose:
669+
now = datetime.datetime.now()
670+
sys.stderr.write("{}/{}/{} - {}:{}:{} | job {} still running\n".format(now.year, now.month, now.day, now.hour, now.minute, now.second, job_id))
671+
sys.stderr.flush()
672+
return False
673+
674+
# If I'm here it means I did not find the job, but the command returned at least 1 line (so it was correctly executed).
675+
if verbose:
676+
now = datetime.datetime.now()
677+
sys.stderr.write("{}/{}/{} - {}:{}:{} | job {} finished\n".format(now.year, now.month, now.day, now.hour, now.minute, now.second, job_id))
678+
sys.stderr.flush()
679+
return True
680+
if verbose:
681+
now = datetime.datetime.now()
682+
sys.stderr.write("{}/{}/{} - {}:{}:{} | error while interrogating the cluster for job {}\n".format(now.year, now.month, now.day, now.hour, now.minute, now.second, job_id))
683+
sys.stderr.flush()
684+
return False
685+
606686

607687
def run_atoms(self, ase_calc, ase_atoms, label="ESP",
608688
in_extension = ".pwi", out_extension=".pwo",

Modules/Ensemble.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
Bohr = 1/__A_TO_BOHR__
8383
__RyToK__ = 157887.32400374097
8484

85+
__GPa__ = 14710.50763554043
8586

8687
__DEBUG_RHO__ = False
8788

@@ -817,6 +818,73 @@ def save_enhanced_xyz(self, filename, append_mode = True, stress_key = "virial",
817818
# Force other processors to wait for the master
818819
CC.Settings.barrier()
819820

821+
def save_raw(self, root_directory, type_dict = None):
822+
"""
823+
Save the ensemble as a set of raw files.
824+
825+
This is the default format for training with deepmd
826+
827+
Parameters
828+
----------
829+
filename : string
830+
The directory on which to save the ensemble. If it does not exist, it is create.
831+
NOTE: this will overwrite any other ensemble saved in raw format in that directory
832+
type_dict : dict
833+
The dictionary between integers and atomic types. If not provided, it is generated on the spot and returned.
834+
835+
Returns
836+
-------
837+
type_dict : dict
838+
The dictionary of the parameters
839+
"""
840+
nat = self.current_dyn.structure.N_atoms * np.prod(self.current_dyn.GetSupercell())
841+
842+
if type_dict is None:
843+
atm = np.unique(self.current_dyn.structure.atoms)
844+
type_dict = {x : i for i, x in enumerate(atm)}
845+
846+
inv_dict = {i : x for x, i in type_dict.items()}
847+
848+
849+
# Save only if the current processor is the master
850+
if Parallel.am_i_the_master():
851+
if not os.path.exists(root_directory):
852+
os.makedirs(root_directory)
853+
854+
if not os.path.isdir(root_directory):
855+
raise IOError("Error, save_raw expects a directory, but '{}' is not a directory.".format(root_directory))
856+
857+
# Save the energies
858+
np.savetxt(os.path.join(root_directory, "energy.raw"), self.energies * Rydberg)
859+
860+
# Save the positions
861+
np.savetxt(os.path.join(root_directory, "coord.raw"), self.xats.reshape((self.N, 3 * nat)))
862+
863+
# Save the box
864+
np.savetxt(os.path.join(root_directory, "box.raw"), np.tile(self.current_dyn.structure.unit_cell.ravel(), (self.N, 1)))
865+
866+
# Save the forces
867+
np.savetxt(os.path.join(root_directory, "force.raw"), self.forces.reshape((self.N, 3*nat)) * Rydberg)
868+
869+
# Save the stress
870+
np.savetxt(os.path.join(root_directory, "virial.raw"), self.stresses.reshape((self.N, 9)) * __GPa__ * 10000)
871+
872+
# Save the types
873+
ss = self.current_dyn.structure.generate_supercell(self.current_dyn.GetSupercell())
874+
875+
with open(os.path.join(root_directory, "type_map.raw"), "w") as fp:
876+
line = " ".join([inv_dict[x] for x in np.arange(len(type_dict))])
877+
fp.write(line + "\n")
878+
879+
with open(os.path.join(root_directory, "type.raw"), "w") as fp:
880+
line = " ".join([str(type_dict[x]) for x in ss.atoms])
881+
fp.write(line + "\n")
882+
883+
884+
885+
# Force other processors to wait for the master
886+
CC.Settings.barrier()
887+
820888

821889

822890

tests/test_simple_relax/test_relax_other.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def test_simple_relax(verbose = False):
6969
minim.min_step_struc = 0.5
7070
minim.meaningful_factor = 1e-10
7171
minim.init()
72-
minim.run()
72+
minim.run(verbose = 0)
7373
minim.finalize()
7474

7575
# Check the differences in the atomic positions

0 commit comments

Comments
 (0)