Skip to content

Commit ddfbd53

Browse files
Merge pull request #337 from SSCHAcode/local_cluster
Local cluster (2FA bypass)
2 parents 529ab99 + c1ad272 commit ddfbd53

2 files changed

Lines changed: 137 additions & 118 deletions

File tree

Modules/Cluster.py

Lines changed: 112 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -334,9 +334,49 @@ def __setattr__(self, name, value):
334334

335335

336336

337+
def copy_file(self, source, destination, server_source = False, server_dest = True, raise_error=False, **kwargs):
338+
"""
339+
COPY A FILE
340+
===========
341+
342+
This function copies a file or directory from the source to the destination.
343+
The destination is on the cluster, the source is in the local machine.
344+
345+
It uses scp to perform the copy.
346+
Alternative implementations can be performed by overloading this function.
337347
348+
args and kwargs are passed to the ExecuteCMD function.
338349
339-
def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster = False):
350+
The result is the output of the ExecuteCMD function.
351+
352+
Parameters
353+
----------
354+
source : string
355+
The source file to be copied
356+
destination : string
357+
The destination file
358+
server_source : bool
359+
If true, the source is on the server
360+
server_dest : bool
361+
If true, the destination is on the server
362+
raise_error : bool
363+
If True, raises an error upon failure
364+
"""
365+
server_path = "%s:" % self.hostname
366+
source_path = f"{source}"
367+
dest_path = f"{destination}"
368+
369+
if server_source:
370+
source_path = server_path + source_path
371+
if server_dest:
372+
dest_path = server_path + dest_path
373+
374+
cmd = self.scpcmd + f" {source_path} {dest_path}"
375+
result = self.ExecuteCMD(cmd, raise_error = raise_error, **kwargs)
376+
return result
377+
378+
379+
def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster = False, use_active_shell = False):
340380
"""
341381
EXECUTE THE CMD ON THE CLUSTER
342382
==============================
@@ -355,6 +395,10 @@ def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster
355395
returned as second value.
356396
on_cluster : bool
357397
If true, the command is executed directly on the cluster through ssh
398+
use_active_shell : bool
399+
If true, the command is executed in a new shell on the cluster
400+
This is usefull if the command is a script that must be executed
401+
in a new shell, or if the command requires .bashrc to be sourced.
358402
359403
Returns
360404
-------
@@ -367,6 +411,14 @@ def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster
367411

368412
if on_cluster:
369413
cmd = self.sshcmd + " {} '{}'".format(self.hostname, cmd)
414+
if use_active_shell:
415+
cmd = "{ssh} {host} -t '{shell} --login -c \"{command}\"'".format(ssh = self.sshcmd,
416+
host = self.hostname,
417+
command = parse_symbols(cmd),
418+
shell = self.terminal)
419+
420+
421+
370422

371423

372424
success = False
@@ -436,9 +488,9 @@ def CheckCommunication(self):
436488
false otherwise.
437489
"""
438490

439-
cmd = self.sshcmd + " %s 'echo ciao'" % self.hostname
440-
441-
status, output = self.ExecuteCMD(cmd, return_output = True)
491+
#cmd = self.sshcmd + " %s 'echo ciao'" % self.hostname
492+
cmd = "echo ciao"
493+
status, output = self.ExecuteCMD(cmd, return_output = True, on_cluster = True)
442494

443495
# print cmd
444496
# p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
@@ -630,7 +682,7 @@ def prepare_input_file(self, structures, calc, labels):
630682
631683
Error message:
632684
'''.format(label)
633-
MSG += str(e)
685+
MSG += str(repr(e))
634686
print(MSG)
635687

636688

@@ -710,8 +762,7 @@ def copy_files(self, list_of_input, list_of_output, to_server):
710762

711763

712764
# Clean eventually input/output file of this very same calculation
713-
cmd = self.sshcmd + " %s '%s'" % (self.hostname, rm_cmd)
714-
self.ExecuteCMD(cmd, False)
765+
self.ExecuteCMD(rm_cmd, False, on_cluster = True)
715766
# cp_res = os.system(cmd + " > /dev/null")
716767
# if cp_res != 0:
717768
# print "Error while executing:", cmd
@@ -720,8 +771,9 @@ def copy_files(self, list_of_input, list_of_output, to_server):
720771
#
721772

722773
# Copy the file into the cluster
723-
cmd = self.scpcmd + " %s %s:%s/" % (tar_file, self.hostname, self.workdir)
724-
cp_res = self.ExecuteCMD(cmd, False)
774+
cp_res = self.copy_file(tar_file, self.workdir, raise_error=False)
775+
#cmd = self.scpcmd + " %s %s:%s/" % (tar_file, self.hostname, self.workdir)
776+
#cp_res = self.ExecuteCMD(cmd, False)
725777
if not cp_res:
726778
print ("Error while executing:", cmd)
727779
print ("Return code:", cp_res)
@@ -734,8 +786,9 @@ def copy_files(self, list_of_input, list_of_output, to_server):
734786

735787
# Unpack the input files and remove the archive
736788
decompress = 'cd {}; tar xf {};'.format(self.workdir, tar_name)
737-
cmd = self.sshcmd + " %s '%s'" % (self.hostname, decompress)
738-
cp_res = self.ExecuteCMD(cmd, False)
789+
#cmd = self.sshcmd + " %s '%s'" % (self.hostname, decompress)
790+
#cp_res = self.ExecuteCMD(cmd, False)
791+
cp_res = self.ExecuteCMD(decompress, False, on_cluster = True)
739792
if not cp_res:
740793
print ("Error while executing:", cmd)
741794
print ("Return code:", cp_res)
@@ -751,16 +804,18 @@ def copy_files(self, list_of_input, list_of_output, to_server):
751804

752805
compress_cmd = 'cd {}; {}'.format(self.workdir, tar_command)
753806

754-
cmd = self.sshcmd + " %s '%s'" % (self.hostname, compress_cmd)
755-
cp_res = self.ExecuteCMD(cmd, False)
807+
# cmd = self.sshcmd + " %s '%s'" % (self.hostname, compress_cmd)
808+
# cp_res = self.ExecuteCMD(cmd, False)
809+
cp_res = self.ExecuteCMD(compress_cmd, False, on_cluster = True)
756810
if not cp_res:
757811
print ("Error while compressing the outputs:", cmd, list_of_output, "\nReturn code:", cp_res)
758812
#return cp_res
759813

760814

761-
# Copy the tar and unpack
762-
cmd = self.scpcmd + "%s:%s %s/" % (self.hostname, os.path.join(self.workdir, tar_name), self.local_workdir)
763-
cp_res = self.ExecuteCMD(cmd, False)
815+
# Copy the tar from the server to the local and unpack
816+
# cmd = self.scpcmd + "%s:%s %s/" % (self.hostname, os.path.join(self.workdir, tar_name), self.local_workdir)
817+
# cp_res = self.ExecuteCMD(cmd, False)
818+
cp_res = self.copy_file(os.path.join(self.workdir, tar_name), self.local_workdir, raise_error=False, server_source=True, server_dest=False)
764819
if not cp_res:
765820
print ("Error while executing:", cmd)
766821
print ("Return code:", cp_res)
@@ -805,20 +860,26 @@ def submit(self, script_location):
805860
It is what returned from self.ExecuteCMD(cmd, False)
806861
"""
807862

808-
cmd = "{ssh} {host} '{submit_cmd} {script}'".format(ssh = self.sshcmd, host = self.hostname,
809-
submit_cmd = self.submit_command, script = script_location)
810-
if self.use_active_shell:
811-
cmd = "{ssh} {host} -t '{shell} --login -c \"{submit_cmd} {script}\"'".format(ssh = self.sshcmd,
812-
host = self.hostname,
813-
submit_cmd = self.submit_command, script = script_location,
814-
shell = self.terminal)
863+
cmd = f"{self.submit_command} {script_location}"
864+
#cmd = "{ssh} {host} '{submit_cmd} {script}'".format(ssh = self.sshcmd, host = self.hostname,
865+
# submit_cmd = self.submit_command, script = script_location)
866+
867+
result, output = self.ExecuteCMD(cmd, False, return_output=True, on_cluster = True,
868+
use_active_shell = self.use_active_shell)
869+
870+
871+
# if self.use_active_shell:
872+
# cmd = "{ssh} {host} -t '{shell} --login -c \"{submit_cmd} {script}\"'".format(ssh = self.sshcmd,
873+
# host = self.hostname,
874+
# submit_cmd = self.submit_command, script = script_location,
875+
# shell = self.terminal)
815876

816877

817878

818879
#cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command,
819880
# self.workdir, label+ "_" + str(indices[0]))
820881

821-
return self.ExecuteCMD(cmd, False, return_output=True)
882+
return result, output
822883

823884
def get_output_path(self, label):
824885
"""
@@ -1114,10 +1175,9 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP",
11141175
submission += mpicmd + " " + binary + "\n"
11151176

11161177
# First of all clean eventually input/output file of this very same calculation
1117-
cmd = self.sshcmd + " %s 'rm -f %s/%s%s %s/%s%s'" % (self.hostname,
1118-
self.workdir, label, in_extension,
1119-
self.workdir, label, out_extension)
1120-
self.ExecuteCMD(cmd, False)
1178+
cmd = "rm -f %s/%s%s %s/%s%s" % (self.workdir, label, in_extension,
1179+
self.workdir, label, out_extension)
1180+
self.ExecuteCMD(cmd, False, on_cluster = True)
11211181
# cp_res = os.system(cmd)
11221182
# if cp_res != 0:
11231183
# print "Error while executing:", cmd
@@ -1128,16 +1188,18 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP",
11281188
f = open("%s/%s.sh" % (self.local_workdir, label), "w")
11291189
f.write(submission)
11301190
f.close()
1131-
cmd = self.scpcmd + " %s/%s.sh %s:%s" % (self.local_workdir, label, self.hostname, self.workdir)
1132-
self.ExecuteCMD(cmd, False)
1191+
#cmd = self.scpcmd + " %s/%s.sh %s:%s" % (self.local_workdir, label, self.hostname, self.workdir)
1192+
self.copy_file("%s/%s.sh" % (self.local_workdir, label), self.workdir, server_source=False, server_dest=True)
1193+
#self.ExecuteCMD(cmd, False)
11331194
# cp_res = os.system(cmd)
11341195
# if cp_res != 0:
11351196
# print "Error while executing:", cmd
11361197
# print "Return code:", cp_res
11371198
# sys.stderr.write(cmd + ": exit with code " + str(cp_res))
11381199
#
1139-
cmd = self.scpcmd + " %s/%s%s %s:%s" % (self.local_workdir, label, in_extension, self.hostname, self.workdir)
1140-
cp_res = self.ExecuteCMD(cmd, False)
1200+
cp_res = self.copy_file("%s/%s%s" % (self.local_workdir, label, in_extension), self.workdir, server_source=False, server_dest=True, raise_error=False)
1201+
#cmd = self.scpcmd + " %s/%s%s %s:%s" % (self.local_workdir, label, in_extension, self.hostname, self.workdir)
1202+
#cp_res = self.ExecuteCMD(cmd, False)
11411203
#cp_res = os.system(cmd)
11421204
if not cp_res:
11431205
#print "Error while executing:", cmd
@@ -1146,18 +1208,20 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP",
11461208
return
11471209

11481210
# Run the simulation
1149-
cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command, self.workdir, label)
1150-
self.ExecuteCMD(cmd, False)
1211+
cmd = "%s %s/%s.sh" % (self.submit_command, self.workdir, label)
1212+
#cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command, self.workdir, label)
1213+
self.ExecuteCMD(cmd, False, on_cluster = True)
11511214
# cp_res = os.system(cmd)
11521215
# if cp_res != 0:
11531216
# print "Error while executing:", cmd
11541217
# print "Return code:", cp_res
11551218
# sys.stderr.write(cmd + ": exit with code " + str(cp_res))
11561219

11571220
# Get the response
1158-
cmd = self.scpcmd + " %s:%s/%s%s %s/" % (self.hostname, self.workdir, label, out_extension,
1159-
self.local_workdir)
1160-
cp_res = self.ExecuteCMD(cmd, False)
1221+
#cmd = self.scpcmd + " %s:%s/%s%s %s/" % (self.hostname, self.workdir, label, out_extension,
1222+
#self.local_workdir)
1223+
#cp_res = self.ExecuteCMD(cmd, False)
1224+
cp_res = self.copy_file("%s/%s%s" % (self.workdir, label, out_extension), self.local_workdir, server_source=True, server_dest=False)
11611225
#cp_res = os.system(cmd)
11621226
if not cp_res:
11631227
print ("Error while executing:", cmd)
@@ -1401,10 +1465,11 @@ def setup_workdir(self, verbose = True):
14011465
"""
14021466
workdir = self.parse_string(self.workdir)
14031467

1404-
sshcmd = self.sshcmd + " %s 'mkdir -p %s'" % (self.hostname,
1405-
workdir)
1468+
cmd = "mkdir -p %s" % workdir
1469+
# sshcmd = self.sshcmd + " %s 'mkdir -p %s'" % (self.hostname,
1470+
# workdir)
14061471

1407-
self.ExecuteCMD(sshcmd, raise_error= True)
1472+
self.ExecuteCMD(cmd, raise_error= True, on_cluster=True)
14081473
#
14091474
# retval = os.system(sshcmd)
14101475
# if retval != 0:
@@ -1441,18 +1506,17 @@ def parse_string(self, string):
14411506

14421507
# Open a pipe with the server
14431508
# Use single ' to avoid string parsing by the local terminal
1444-
cmd = "%s %s 'echo \"%s\"'" % (self.sshcmd, self.hostname, string)
1509+
#cmd = "%s %s 'echo \"%s\"'" % (self.sshcmd, self.hostname, string)
1510+
cmd = f"echo \"{string}\""
14451511

1446-
if self.use_active_shell_for_parsing:
1447-
cmd = "{ssh} {host} -t '{shell} --login -c \"echo {string}\"'".format(ssh = self.sshcmd,
1448-
host = self.hostname,
1449-
string = parse_symbols(string),
1450-
shell = self.terminal)
1451-
#print cmd
1512+
status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True, use_active_shell = self.use_active_shell_for_parsing, on_cluster = True)
1513+
1514+
#print cmd
14521515

14531516
#print(cmd)
14541517

1455-
status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True)
1518+
#status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True)
1519+
14561520
#
14571521
# p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
14581522
# output, err = p.communicate()
@@ -1621,73 +1685,3 @@ def compute_ensemble(self, ensemble, ase_calc, get_stress = True, timeout=None):
16211685
self.compute_ensemble_batch(ensemble, ase_calc, get_stress, timeout)
16221686
return
16231687

1624-
"""
1625-
# Track the remaining configurations
1626-
success = [False] * ensemble.N
1627-
1628-
# Setup if the ensemble has the stress
1629-
ensemble.has_stress = get_stress
1630-
1631-
# Check if the working directory exists
1632-
if not os.path.isdir(self.local_workdir):
1633-
os.makedirs(self.local_workdir)
1634-
1635-
# Prepare the function for the simultaneous submission
1636-
def compute_single(num, calc):
1637-
atm = ensemble.structures[num].get_ase_atoms()
1638-
res = self.run_atoms(calc, atm, self.label + str(num),
1639-
n_nodes = self.n_nodes,
1640-
n_cpu=self.n_cpu,
1641-
npool = self.n_pool)
1642-
if res:
1643-
ensemble.energies[num] = res["energy"] / units["Ry"]
1644-
ensemble.forces[num, :, :] = res["forces"] / units["Ry"]
1645-
if get_stress:
1646-
stress = np.zeros((3,3), dtype = np.float64)
1647-
stress[0,0] = res["stress"][0]
1648-
stress[1,1] = res["stress"][1]
1649-
stress[2,2] = res["stress"][2]
1650-
stress[1,2] = res["stress"][3]
1651-
stress[2,1] = res["stress"][3]
1652-
stress[0,2] = res["stress"][4]
1653-
stress[2,0] = res["stress"][4]
1654-
stress[0,1] = res["stress"][5]
1655-
stress[1,0] = res["stress"][5]
1656-
# Remember, ase has a very strange definition of the stress
1657-
ensemble.stresses[num, :, :] = -stress * units["Bohr"]**3 / units["Ry"]
1658-
success[num] = True
1659-
1660-
# Get the expected number of batch
1661-
num_batch_offset = int(ensemble.N / self.batch_size)
1662-
1663-
# Run until some work has not finished
1664-
recalc = 0
1665-
while np.sum(np.array(success, dtype = int) - 1) != 0:
1666-
threads = []
1667-
1668-
# Get the remaining jobs
1669-
false_mask = np.array(success) == False
1670-
false_id = np.arange(ensemble.N)[false_mask]
1671-
1672-
count = 0
1673-
# Submit in parallel
1674-
for i in false_id:
1675-
# Submit only the batch size
1676-
if count >= self.batch_size:
1677-
break
1678-
t = threading.Thread(target = compute_single, args=(i, ase_calc, ))
1679-
t.start()
1680-
threads.append(t)
1681-
count += 1
1682-
1683-
1684-
# Wait until all the job have finished
1685-
for t in threads:
1686-
t.join(timeout)
1687-
1688-
recalc += 1
1689-
if recalc > num_batch_offset + self.max_recalc:
1690-
print ("Expected batch ordinary resubmissions:", num_batch_offset)
1691-
raise ValueError("Error, resubmissions exceeded the maximum number of %d" % self.max_recalc)
1692-
break
1693-
"""

Modules/LocalCluster.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import sscha.Cluster as Cluster
2+
import sys, os
3+
4+
"""
5+
Define a local cluster class.
6+
This allows to mock the cluster class and run the code locally, but by
7+
using the same interface as the cluster class and a job scheduler like SLURM.
8+
"""
9+
10+
11+
class LocalCluster(Cluster.Cluster):
12+
def ExecuteCMD(self, cmd, *args, on_cluster = False, **kwargs):
13+
"""
14+
Execute a command in the local machine.
15+
"""
16+
17+
# Override the value of on_cluster
18+
return super().ExecuteCMD(cmd, *args, on_cluster = False, **kwargs)
19+
20+
def copy_file(self, source, destination, server_source = False, server_dest = False, **kwargs):
21+
"""
22+
Copy the files ignoring if the cluster is used.
23+
"""
24+
25+
return super().copy_file(source, destination, server_source = False, server_dest = False, **kwargs)

0 commit comments

Comments
 (0)