Skip to content

Commit 661882a

Browse files
add "JH_UniScheduler" batch_type (#459)
JH UniScheduler was developed by JHINNO company. JH UniScheduler is a commercial software and uses "jsub" to submit tasks. Its overall architecture is similar to that of IBM's LSF. However, there are still some differences between them. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ### Release Notes - **New Features** - Introduced support for a new scheduler system, `JH_UniScheduler`, in the DPDispatcher Python package. - Added documentation and setup instructions for `JH_UniScheduler`. - **Documentation** - Updated descriptions in the documentation to include `JH_UniScheduler`. - Added detailed setup requirements and usage instructions for `JH_UniScheduler`. - **Testing** - Added new test cases to ensure proper functionality of `JH_UniScheduler`. - Included configuration files and simulation input commands for testing `JH_UniScheduler`. - **Configuration** - Updated project configuration to include `jh_unischeduler` as a keyword. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent b31c1ea commit 661882a

19 files changed

+625
-10
lines changed

doc/batch.md

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ To avoid running multiple jobs at the same time, one could set {dargs:argument}`
1818
{dargs:argument}`batch_type <resources/batch_type>`: `Slurm`, `SlurmJobArray`
1919

2020
[Slurm](https://slurm.schedmd.com/) is a job scheduling system used by lots of HPCs.
21-
One needs to make sure slurm has been setup in the remote server and the related environment is activated.
21+
One needs to make sure slurm has been set up in the remote server and the related environment is activated.
2222

2323
When `SlurmJobArray` is used, dpdispatcher submits Slurm jobs with [job arrays](https://slurm.schedmd.com/job_array.html).
2424
In this way, several dpdispatcher {class}`task <dpdispatcher.submission.Task>`s map to a Slurm job and a dpdispatcher {class}`job <dpdispatcher.submission.Job>` maps to a Slurm job array.
@@ -30,7 +30,7 @@ One can use {dargs:argument}`group_size <resources/group_size>` and {dargs:argum
3030
{dargs:argument}`batch_type <resources/batch_type>`: `PBS`
3131

3232
[OpenPBS](https://www.openpbs.org/) is an open-source job scheduling of the Linux Foundation and [PBS Profession](https://www.altair.com/pbs-professional/) is its commercial solution.
33-
One needs to make sure OpenPBS has been setup in the remote server and the related environment is activated.
33+
One needs to make sure OpenPBS has been set up in the remote server and the related environment is activated.
3434

3535
Note that do not use `PBS` for Torque.
3636

@@ -40,14 +40,22 @@ Note that do not use `PBS` for Torque.
4040

4141
The [Terascale Open-source Resource and QUEue Manager (TORQUE)](https://adaptivecomputing.com/cherry-services/torque-resource-manager/) is a distributed resource manager based on standard OpenPBS.
4242
However, not all OpenPBS flags are still supported in TORQUE.
43-
One needs to make sure TORQUE has been setup in the remote server and the related environment is activated.
43+
One needs to make sure TORQUE has been set up in the remote server and the related environment is activated.
4444

4545
## LSF
4646

4747
{dargs:argument}`batch_type <resources/batch_type>`: `LSF`
4848

4949
[IBM Spectrum LSF Suites](https://www.ibm.com/products/hpc-workload-management) is a comprehensive workload management solution used by HPCs.
50-
One needs to make sure LSF has been setup in the remote server and the related environment is activated.
50+
One needs to make sure LSF has been set up in the remote server and the related environment is activated.
51+
52+
## JH UniScheduler
53+
54+
{dargs:argument}`batch_type <resources/batch_type>`: `JH_UniScheduler`
55+
56+
[JH UniScheduler](http://www.jhinno.com/m/custom_case_05.html) was developed by JHINNO company and uses "jsub" to submit tasks.
57+
Its overall architecture is similar to that of IBM's LSF. However, there are still some differences between them. One needs to
58+
make sure JH UniScheduler has been set up in the remote server and the related environment is activated.
5159

5260
## Bohrium
5361

@@ -74,10 +82,10 @@ Read Fujitsu cloud service documentation for details.
7482
## OpenAPI
7583

7684
{dargs:argument}`batcy_type <resources/batch_type>`: `OpenAPI`
77-
OpenAPI is a new way to submit jobs to Bohrium. It using [AccessKey](https://bohrium.dp.tech/personal/setting) instead of username and password. Read Bohrium documentation for details.
85+
OpenAPI is a new way to submit jobs to Bohrium. It is using [AccessKey](https://bohrium.dp.tech/personal/setting) instead of username and password. Read Bohrium documentation for details.
7886

7987
## SGE
8088

8189
{dargs:argument}`batch_type <resources/batch_type>`: `SGE`
8290

83-
The [Sun Grid Engine (SGE) scheduler](https://gridscheduler.sourceforge.net) is a batch-queueing system distributed resource management. The commands and flags of SGE share a lot similarity with PBS except when checking job status. Use this argument if one is submitting job to SGE based batch system.
91+
The [Sun Grid Engine (SGE) scheduler](https://gridscheduler.sourceforge.net) is a batch-queueing system distributed resource management. The commands and flags of SGE share a lot of similarity with PBS except when checking job status. Use this argument if one is submitting job to an SGE-based batch system.

doc/index.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
DPDispatcher's documentation
77
======================================
88

9-
DPDispatcher is a Python package used to generate HPC (High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs input scripts and submit these scripts to HPC systems and poke until they finish.
9+
DPDispatcher is a Python package used to generate HPC (High Performance Computing) scheduler systems (Slurm/PBS/LSF/JH_SCheduler/dpcloudserver) jobs input scripts and submit these scripts to HPC systems and poke until they finish.
1010

1111
DPDispatcher will monitor (poke) until these jobs finish and download the results files (if these jobs is running on remote systems connected by SSH).
1212

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import shlex
2+
from typing import List
3+
4+
from dargs import Argument
5+
6+
from dpdispatcher.dlog import dlog
7+
from dpdispatcher.machine import Machine
8+
from dpdispatcher.utils.job_status import JobStatus
9+
from dpdispatcher.utils.utils import (
10+
RetrySignal,
11+
customized_script_header_template,
12+
retry,
13+
)
14+
15+
JH_UniScheduler_script_header_template = """\
16+
#!/bin/bash -l
17+
#JSUB -e %J.err
18+
#JSUB -o %J.out
19+
{JH_UniScheduler_nodes_line}
20+
{JH_UniScheduler_ptile_line}
21+
{JH_UniScheduler_partition_line}
22+
{JH_UniScheduler_number_gpu_line}"""
23+
24+
25+
class JH_UniScheduler(Machine):
26+
"""JH_UniScheduler batch."""
27+
28+
def gen_script(self, job):
29+
JH_UniScheduler_script = super().gen_script(job)
30+
return JH_UniScheduler_script
31+
32+
def gen_script_header(self, job):
33+
resources = job.resources
34+
script_header_dict = {
35+
"JH_UniScheduler_nodes_line": f"#JSUB -n {resources.number_node * resources.cpu_per_node}",
36+
"JH_UniScheduler_ptile_line": f"#JSUB -R 'span[ptile={resources.cpu_per_node}]'",
37+
"JH_UniScheduler_partition_line": f"#JSUB -q {resources.queue_name}",
38+
}
39+
custom_gpu_line = resources.kwargs.get("custom_gpu_line", None)
40+
if not custom_gpu_line:
41+
script_header_dict["JH_UniScheduler_number_gpu_line"] = (
42+
"" f"#JSUB -gpgpu {resources.gpu_per_node}"
43+
)
44+
else:
45+
script_header_dict["JH_UniScheduler_number_gpu_line"] = custom_gpu_line
46+
if (
47+
resources["strategy"].get("customized_script_header_template_file")
48+
is not None
49+
):
50+
JH_UniScheduler_script_header = customized_script_header_template(
51+
resources["strategy"]["customized_script_header_template_file"],
52+
resources,
53+
)
54+
else:
55+
JH_UniScheduler_script_header = (
56+
JH_UniScheduler_script_header_template.format(**script_header_dict)
57+
)
58+
59+
return JH_UniScheduler_script_header
60+
61+
@retry()
62+
def do_submit(self, job):
63+
script_file_name = job.script_file_name
64+
script_str = self.gen_script(job)
65+
job_id_name = job.job_hash + "_job_id"
66+
self.context.write_file(fname=script_file_name, write_str=script_str)
67+
script_run_str = self.gen_script_command(job)
68+
script_run_file_name = f"{job.script_file_name}.run"
69+
self.context.write_file(fname=script_run_file_name, write_str=script_run_str)
70+
71+
try:
72+
stdin, stdout, stderr = self.context.block_checkcall(
73+
"cd {} && {} {}".format(
74+
shlex.quote(self.context.remote_root),
75+
"jsub < ",
76+
shlex.quote(script_file_name),
77+
)
78+
)
79+
except RuntimeError as err:
80+
raise RetrySignal(err) from err
81+
82+
subret = stdout.readlines()
83+
job_id = subret[0].split()[1][1:-1]
84+
self.context.write_file(job_id_name, job_id)
85+
return job_id
86+
87+
def default_resources(self, resources):
88+
pass
89+
90+
@retry()
91+
def check_status(self, job):
92+
try:
93+
job_id = job.job_id
94+
except AttributeError:
95+
return JobStatus.terminated
96+
if job_id == "":
97+
return JobStatus.unsubmitted
98+
ret, stdin, stdout, stderr = self.context.block_call("jjobs " + job_id)
99+
err_str = stderr.read().decode("utf-8")
100+
if (f"Job <{job_id}> is not found") in err_str:
101+
if self.check_finish_tag(job):
102+
return JobStatus.finished
103+
else:
104+
return JobStatus.terminated
105+
elif ret != 0:
106+
# just retry when any unknown error raised.
107+
raise RetrySignal(
108+
"Get error code %d in checking status through ssh with job: %s . message: %s"
109+
% (ret, job.job_hash, err_str)
110+
)
111+
status_out = stdout.read().decode("utf-8").split("\n")
112+
if len(status_out) < 2:
113+
return JobStatus.unknown
114+
else:
115+
status_line = status_out[1]
116+
status_word = status_line.split()[2]
117+
118+
if status_word in ["PEND"]:
119+
return JobStatus.waiting
120+
elif status_word in ["RUN", "PSUSP", "SSUSP", "USUSP"]:
121+
return JobStatus.running
122+
elif status_word in ["DONE", "EXIT"]:
123+
if self.check_finish_tag(job):
124+
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
125+
return JobStatus.finished
126+
else:
127+
return JobStatus.terminated
128+
else:
129+
return JobStatus.unknown
130+
131+
def check_finish_tag(self, job):
132+
job_tag_finished = job.job_hash + "_job_tag_finished"
133+
return self.context.check_file_exists(job_tag_finished)
134+
135+
@classmethod
136+
def resources_subfields(cls) -> List[Argument]:
137+
"""Generate the resources subfields.
138+
139+
Returns
140+
-------
141+
list[Argument]
142+
resources subfields
143+
"""
144+
doc_custom_gpu_line = "Custom GPU configuration, starting with #JSUB"
145+
146+
return [
147+
Argument(
148+
"kwargs",
149+
dict,
150+
[
151+
Argument(
152+
"custom_gpu_line",
153+
str,
154+
optional=True,
155+
default=None,
156+
doc=doc_custom_gpu_line,
157+
),
158+
],
159+
optional=False,
160+
doc="Extra arguments.",
161+
)
162+
]
163+
164+
def kill(self, job):
165+
"""Kill the job.
166+
167+
Parameters
168+
----------
169+
job : Job
170+
job
171+
"""
172+
job_id = job.job_id
173+
ret, stdin, stdout, stderr = self.context.block_call(
174+
"jctrl kill " + str(job_id)
175+
)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ dependencies = [
3232
]
3333
requires-python = ">=3.7"
3434
readme = "README.md"
35-
keywords = ["dispatcher", "hpc", "slurm", "lsf", "pbs", "ssh"]
35+
keywords = ["dispatcher", "hpc", "slurm", "lsf", "pbs", "ssh", "jh_unischeduler"]
3636

3737
[project.urls]
3838
Homepage = "https://github.com/deepmodeling/dpdispatcher"

tests/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from dpdispatcher.machine import Machine # noqa: F401
2020
from dpdispatcher.machines.distributed_shell import DistributedShell # noqa: F401
2121
from dpdispatcher.machines.dp_cloud_server import Lebesgue # noqa: F401
22+
from dpdispatcher.machines.JH_UniScheduler import JH_UniScheduler # noqa: F401
2223
from dpdispatcher.machines.lsf import LSF # noqa: F401
2324
from dpdispatcher.machines.pbs import PBS # noqa: F401
2425
from dpdispatcher.machines.shell import Shell # noqa: F401
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import json
2+
import os
3+
import sys
4+
5+
from dpdispatcher.machine import Machine
6+
from dpdispatcher.submission import Resources, Submission, Task
7+
8+
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
9+
10+
# task_need_resources has no effect
11+
with open("jsons/machine_jh_unischeduler.json") as f:
12+
mdata = json.load(f)
13+
14+
machine = Machine.load_from_dict(mdata["machine"])
15+
resources = Resources.load_from_dict(mdata["resources"])
16+
17+
submission = Submission(
18+
work_base="0_md/",
19+
machine=machine,
20+
resources=resources,
21+
forward_common_files=["graph.pb"],
22+
backward_common_files=[],
23+
)
24+
25+
task1 = Task(
26+
command="lmp -i input.lammps",
27+
task_work_path="bct-1/",
28+
forward_files=["conf.lmp", "input.lammps"],
29+
backward_files=["log.lammps"],
30+
)
31+
task2 = Task(
32+
command="lmp -i input.lammps",
33+
task_work_path="bct-2/",
34+
forward_files=["conf.lmp", "input.lammps"],
35+
backward_files=["log.lammps"],
36+
)
37+
task3 = Task(
38+
command="lmp -i input.lammps",
39+
task_work_path="bct-3/",
40+
forward_files=["conf.lmp", "input.lammps"],
41+
backward_files=["log.lammps"],
42+
)
43+
task4 = Task(
44+
command="lmp -i input.lammps",
45+
task_work_path="bct-4/",
46+
forward_files=["conf.lmp", "input.lammps"],
47+
backward_files=["log.lammps"],
48+
)
49+
submission.register_task_list(
50+
[
51+
task1,
52+
task2,
53+
task3,
54+
task4,
55+
]
56+
)
57+
submission.run_submission(clean=True)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"machine": {
3+
"batch_type": "JH_UniScheduler",
4+
"context_type": "local",
5+
"local_root": "./",
6+
"remote_root": "/data/home/wangsimin/machine_learning/dpgen/task/test/dpgen_example/run1"
7+
},
8+
"resources":{
9+
"number_node": 1,
10+
"cpu_per_node": 4,
11+
"gpu_per_node": 1,
12+
"queue_name": "gpu",
13+
"group_size": 4,
14+
"source_list": ["/public/software/deepmd-kit/bin/activate /public/software/deepmd-kit"]
15+
}
16+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"machine": {
3+
"batch_type": "JH_UniScheduler",
4+
"context_type": "LazyLocalContext",
5+
"local_root": "./test_jh_unischeduler"
6+
},
7+
"resources": {
8+
"number_node": 1,
9+
"cpu_per_node": 4,
10+
"queue_name": "gpu",
11+
"gpu_per_node": 1,
12+
"group_size": 4,
13+
"strategy": {
14+
"if_cuda_multi_devices": false
15+
},
16+
"source_list": ["./slurm_test.env"]
17+
}
18+
}

0 commit comments

Comments
 (0)