Skip to content

Commit 3594f91

Browse files
GravityPHYpre-commit-ci[bot]njzjz
authored
add SGE class in pbs.py (#323)
This SGE class inherit from PBS machine class. I have tested with a vanilla python print job and it works. I can continue working on testing with more complex tasks. --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Jinzhe Zeng <jinzhe.zeng@rutgers.edu>
1 parent d3a6eb6 commit 3594f91

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

dpdispatcher/machines/pbs.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,3 +177,107 @@ def gen_script_header(self, job):
177177
**pbs_script_header_dict
178178
)
179179
return pbs_script_header
180+
181+
182+
sge_script_header_template = """
183+
#!/bin/bash
184+
#$ -N dpdispatcher_submit
185+
{select_node_line}
186+
#$ -cwd
187+
188+
"""
189+
190+
191+
class SGE(PBS):
192+
def __init__(
193+
self,
194+
batch_type=None,
195+
context_type=None,
196+
local_root=None,
197+
remote_root=None,
198+
remote_profile={},
199+
*,
200+
context=None,
201+
):
202+
super(PBS, self).__init__(
203+
batch_type,
204+
context_type,
205+
local_root,
206+
remote_root,
207+
remote_profile,
208+
context=context,
209+
)
210+
211+
def gen_script_header(self, job):
212+
resources = job.resources
213+
sge_script_header_dict = {}
214+
# resources.number_node is not used
215+
sge_script_header_dict[
216+
"select_node_line"
217+
] = f"#$ -pe mpi {resources.cpu_per_node} "
218+
# resources.queue_name is not necessary
219+
sge_script_header = sge_script_header_template.format(**sge_script_header_dict)
220+
return sge_script_header
221+
222+
def do_submit(self, job):
223+
script_file_name = job.script_file_name
224+
script_str = self.gen_script(job)
225+
job_id_name = job.job_hash + "_job_id"
226+
self.context.write_file(fname=script_file_name, write_str=script_str)
227+
script_file_dir = self.context.remote_root
228+
stdin, stdout, stderr = self.context.block_checkcall(
229+
"cd {} && {} {}".format(script_file_dir, "qsub", script_file_name)
230+
)
231+
subret = stdout.readlines()
232+
job_id = subret[0].split()[2]
233+
self.context.write_file(job_id_name, job_id)
234+
return job_id
235+
236+
def default_resources(self, resources):
237+
pass
238+
239+
def check_status(self, job):
240+
job_id = job.job_id
241+
status_line = None
242+
if job_id == "":
243+
return JobStatus.unsubmitted
244+
ret, stdin, stdout, stderr = self.context.block_call("qstat")
245+
err_str = stderr.read().decode("utf-8")
246+
if ret != 0:
247+
raise RuntimeError(
248+
"status command qstat fails to execute. erro info: %s return code %d"
249+
% (err_str, ret)
250+
)
251+
status_text_list = stdout.read().decode("utf-8").split("\n")
252+
for txt in status_text_list:
253+
if job_id in txt:
254+
status_line = txt
255+
256+
if status_line is None:
257+
count = 0
258+
while count <= 6:
259+
if self.check_finish_tag(job=job):
260+
return JobStatus.finished
261+
dlog.info(
262+
"not tag_finished detected, execute sync command and wait. count "
263+
+ str(count)
264+
)
265+
self.context.block_call("sync")
266+
import time
267+
268+
time.sleep(10)
269+
count += 1
270+
return JobStatus.terminated
271+
else:
272+
status_word = status_line.split()[4]
273+
# dlog.info (status_word)
274+
if status_word in ["qw"]:
275+
return JobStatus.waiting
276+
elif status_word in ["r"]:
277+
return JobStatus.running
278+
else:
279+
return JobStatus.unknown
280+
281+
def check_finish_tag(self, job):
282+
job_tag_finished = job.job_hash + "_job_tag_finished"
283+
return self.context.check_file_exists(job_tag_finished)

0 commit comments

Comments
 (0)