Skip to content

Commit 83b84c5

Browse files
committed
Added a LocalCluster interface
1 parent 3f4ab41 commit 83b84c5

2 files changed

Lines changed: 42 additions & 81 deletions

File tree

Modules/Cluster.py

Lines changed: 17 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -488,9 +488,9 @@ def CheckCommunication(self):
488488
false otherwise.
489489
"""
490490

491-
cmd = self.sshcmd + " %s 'echo ciao'" % self.hostname
492-
493-
status, output = self.ExecuteCMD(cmd, return_output = True)
491+
#cmd = self.sshcmd + " %s 'echo ciao'" % self.hostname
492+
cmd = "echo ciao"
493+
status, output = self.ExecuteCMD(cmd, return_output = True, on_cluster = True)
494494

495495
# print cmd
496496
# p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
@@ -860,20 +860,26 @@ def submit(self, script_location):
860860
It is what returned from self.ExecuteCMD(cmd, False)
861861
"""
862862

863-
cmd = "{ssh} {host} '{submit_cmd} {script}'".format(ssh = self.sshcmd, host = self.hostname,
864-
submit_cmd = self.submit_command, script = script_location)
865-
if self.use_active_shell:
866-
cmd = "{ssh} {host} -t '{shell} --login -c \"{submit_cmd} {script}\"'".format(ssh = self.sshcmd,
867-
host = self.hostname,
868-
submit_cmd = self.submit_command, script = script_location,
869-
shell = self.terminal)
863+
cmd = f"{self.submit_command} {script_location}"
864+
#cmd = "{ssh} {host} '{submit_cmd} {script}'".format(ssh = self.sshcmd, host = self.hostname,
865+
# submit_cmd = self.submit_command, script = script_location)
866+
867+
result, output = self.ExecuteCMD(cmd, False, return_output=True, on_cluster = True,
868+
use_active_shell = self.use_active_shell)
869+
870+
871+
# if self.use_active_shell:
872+
# cmd = "{ssh} {host} -t '{shell} --login -c \"{submit_cmd} {script}\"'".format(ssh = self.sshcmd,
873+
# host = self.hostname,
874+
# submit_cmd = self.submit_command, script = script_location,
875+
# shell = self.terminal)
870876

871877

872878

873879
#cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command,
874880
# self.workdir, label+ "_" + str(indices[0]))
875881

876-
return self.ExecuteCMD(cmd, False, return_output=True)
882+
return result, output
877883

878884
def get_output_path(self, label):
879885
"""
@@ -1679,73 +1685,3 @@ def compute_ensemble(self, ensemble, ase_calc, get_stress = True, timeout=None):
16791685
self.compute_ensemble_batch(ensemble, ase_calc, get_stress, timeout)
16801686
return
16811687

1682-
"""
1683-
# Track the remaining configurations
1684-
success = [False] * ensemble.N
1685-
1686-
# Setup if the ensemble has the stress
1687-
ensemble.has_stress = get_stress
1688-
1689-
# Check if the working directory exists
1690-
if not os.path.isdir(self.local_workdir):
1691-
os.makedirs(self.local_workdir)
1692-
1693-
# Prepare the function for the simultaneous submission
1694-
def compute_single(num, calc):
1695-
atm = ensemble.structures[num].get_ase_atoms()
1696-
res = self.run_atoms(calc, atm, self.label + str(num),
1697-
n_nodes = self.n_nodes,
1698-
n_cpu=self.n_cpu,
1699-
npool = self.n_pool)
1700-
if res:
1701-
ensemble.energies[num] = res["energy"] / units["Ry"]
1702-
ensemble.forces[num, :, :] = res["forces"] / units["Ry"]
1703-
if get_stress:
1704-
stress = np.zeros((3,3), dtype = np.float64)
1705-
stress[0,0] = res["stress"][0]
1706-
stress[1,1] = res["stress"][1]
1707-
stress[2,2] = res["stress"][2]
1708-
stress[1,2] = res["stress"][3]
1709-
stress[2,1] = res["stress"][3]
1710-
stress[0,2] = res["stress"][4]
1711-
stress[2,0] = res["stress"][4]
1712-
stress[0,1] = res["stress"][5]
1713-
stress[1,0] = res["stress"][5]
1714-
# Remember, ase has a very strange definition of the stress
1715-
ensemble.stresses[num, :, :] = -stress * units["Bohr"]**3 / units["Ry"]
1716-
success[num] = True
1717-
1718-
# Get the expected number of batch
1719-
num_batch_offset = int(ensemble.N / self.batch_size)
1720-
1721-
# Run until some work has not finished
1722-
recalc = 0
1723-
while np.sum(np.array(success, dtype = int) - 1) != 0:
1724-
threads = []
1725-
1726-
# Get the remaining jobs
1727-
false_mask = np.array(success) == False
1728-
false_id = np.arange(ensemble.N)[false_mask]
1729-
1730-
count = 0
1731-
# Submit in parallel
1732-
for i in false_id:
1733-
# Submit only the batch size
1734-
if count >= self.batch_size:
1735-
break
1736-
t = threading.Thread(target = compute_single, args=(i, ase_calc, ))
1737-
t.start()
1738-
threads.append(t)
1739-
count += 1
1740-
1741-
1742-
# Wait until all the job have finished
1743-
for t in threads:
1744-
t.join(timeout)
1745-
1746-
recalc += 1
1747-
if recalc > num_batch_offset + self.max_recalc:
1748-
print ("Expected batch ordinary resubmissions:", num_batch_offset)
1749-
raise ValueError("Error, resubmissions exceeded the maximum number of %d" % self.max_recalc)
1750-
break
1751-
"""

Modules/LocalCluster.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import sscha.Cluster as Cluster
2+
import sys, os
3+
4+
"""
5+
Define a local cluster class.
6+
This allows to mock the cluster class and run the code locally, but by
7+
using the same interface as the cluster class and a job scheduler like SLURM.
8+
"""
9+
10+
11+
class LocalCluster(Cluster.Cluster):
12+
def ExecuteCMD(self, cmd, *args, on_cluster = False, **kwargs):
13+
"""
14+
Execute a command in the local machine.
15+
"""
16+
17+
# Override the value of on_cluster
18+
super().ExecuteCMD(cmd, *args, on_cluster = False, **kwargs)
19+
20+
def copy_file(self, source, destination, server_source = False, server_dest = False, **kwargs):
21+
"""
22+
Copy the files ignoring if the cluster is used.
23+
"""
24+
25+
super().copy_file(source, destination, server_source = False, server_dest = False, **kwargs)

0 commit comments

Comments
 (0)