Skip to content

Commit 95264a5

Browse files
authored
Merge branch 'deepmodeling:master' into master
2 parents 4b63bd2 + 3340249 commit 95264a5

File tree

7 files changed

+63
-34
lines changed

7 files changed

+63
-34
lines changed

conda/meta.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ requirements:
2525
- dargs
2626
- paramiko
2727
- requests
28+
- tqdm
2829

2930
run:
3031
- python >=3.6
3132
- dargs
3233
- paramiko
3334
- requests
35+
- tqdm
3436

3537
test:
3638
imports:

dpdispatcher/hdfs_context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,6 @@ def read_file(self, fname):
230230

231231
def check_file_exists(self, fname):
232232
return HDFS.exists(os.path.join(self.remote_root, fname))
233+
234+
def kill(self, job_id):
235+
pass

dpdispatcher/lazy_local_context.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dpdispatcher.base_context import BaseContext
2-
import os
2+
import os,signal
33
import subprocess as sp
44

55
class SPRetObj(object) :
@@ -149,8 +149,8 @@ def call(self, cmd) :
149149
os.chdir(cwd)
150150
return proc
151151

152-
def kill(self, proc):
153-
proc.kill()
152+
def kill(self, job_id):
153+
os.kill(job_id, signal.SIGTERM)
154154

155155
def check_finish(self, proc):
156156
return (proc.poll() != None)

dpdispatcher/local_context.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dpdispatcher.base_context import BaseContext
2-
import os,shutil,hashlib
2+
import os,shutil,hashlib,signal
33
import subprocess as sp
44
from glob import glob
55
from dpdispatcher import dlog
@@ -397,8 +397,8 @@ def call(self, cmd) :
397397
os.chdir(cwd)
398398
return proc
399399

400-
def kill(self, proc):
401-
proc.kill()
400+
def kill(self, job_id):
401+
os.kill(job_id, signal.SIGTERM)
402402

403403
def check_finish(self, proc):
404404
return (proc.poll() != None)

dpdispatcher/ssh_context.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -533,27 +533,15 @@ def _get_files(self,
533533
of = self.submission.submission_hash + '.tar.gz'
534534
# remote tar
535535
# If the number of files are large, we may get "Argument list too long" error.
536-
# Thus, we may run tar commands for serveral times and tar only 100 files for
537-
# each time.
536+
# Thus, "-T" accepts a file containing the list of files
538537
per_nfile = 100
539538
ntar = len(files) // per_nfile + 1
540539
if ntar <= 1:
541540
self.block_checkcall('tar czfh %s %s' % (of, " ".join(files)))
542541
else:
543-
of_tar = self.submission.submission_hash + '.tar'
544-
for ii in range(ntar):
545-
ff = files[per_nfile * ii : per_nfile * (ii+1)]
546-
if ii == 0:
547-
# tar cf for the first time
548-
self.block_checkcall('tar cfh %s %s' % (of_tar, " ".join(ff)))
549-
else:
550-
# append using tar rf
551-
# -r, --append append files to the end of an archive
552-
self.block_checkcall('tar rfh %s %s' % (of_tar, " ".join(ff)))
553-
# compress the tar file using gzip, and will get a tar.gz file
554-
# overwrite considering dpgen may stop and restart
555-
# -f, --force force overwrite of output file and compress links
556-
self.block_checkcall('gzip -f %s' % of_tar)
542+
file_list_file = os.path.join(self.remote_root, ".tmp.tar." + str(uuid.uuid4()))
543+
self.write_file(file_list_file, "\n".join(files))
544+
self.block_checkcall('tar czfh %s -T %s' % (of, file_list_file))
557545
# trans
558546
from_f = pathlib.PurePath(os.path.join(self.remote_root, of)).as_posix()
559547
to_f = pathlib.PurePath(os.path.join(self.local_root, of)).as_posix()

dpdispatcher/submission.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11

22
# %%
33
import time,random,uuid,json,copy
4-
54
from dargs.dargs import Argument, Variant
65
from dpdispatcher.JobStatus import JobStatus
76
from dpdispatcher import dlog
@@ -10,7 +9,7 @@
109
from dpdispatcher.machine import Machine
1110
# from dpdispatcher.slurm import SlurmResources
1211
#%%
13-
default_strategy = dict(if_cuda_multi_devices=False)
12+
default_strategy = dict(if_cuda_multi_devices=False, ratio_unfinished=0.0)
1413

1514
class Submission(object):
1615
"""A submission represents a collection of tasks.
@@ -181,11 +180,16 @@ def run_submission(self, *, exit_on_submit=False, clean=True):
181180
self.check_all_finished()
182181
self.handle_unexpected_submission_state()
183182

183+
ratio_unfinished = self.resources.strategy['ratio_unfinished']
184184
while not self.check_all_finished():
185185
if exit_on_submit is True:
186186
dlog.info(f"submission succeeded: {self.submission_hash}")
187187
dlog.info(f"at {self.machine.context.remote_root}")
188188
return self.serialize()
189+
if ratio_unfinished > 0.0 and self.check_ratio_unfinished(ratio_unfinished):
190+
self.remove_unfinished_jobs()
191+
break
192+
189193
try:
190194
time.sleep(30)
191195
except (Exception, KeyboardInterrupt, SystemExit) as e:
@@ -252,6 +256,30 @@ def handle_unexpected_submission_state(self):
252256

253257
# def update_submi
254258

259+
def check_ratio_unfinished(self, ratio_unfinished):
260+
status_list = [job.job_state for job in self.belonging_jobs]
261+
finished_num = status_list.count(JobStatus.finished)
262+
if finished_num / len(self.belonging_jobs) < (1 - ratio_unfinished):
263+
return False
264+
else:
265+
return True
266+
267+
def remove_unfinished_jobs(self):
268+
removed_jobs = [job for job in self.belonging_jobs if job.job_state not in [JobStatus.finished]]
269+
self.belonging_jobs = [job for job in self.belonging_jobs if job.job_state in [JobStatus.finished]]
270+
for job in removed_jobs:
271+
# kill unfinished jobs
272+
try:
273+
self.machine.context.kill(job.job_id)
274+
except Exception as e:
275+
dlog.info("Can not kill job %s" % job.job_id)
276+
277+
# remove unfinished tasks
278+
import os,shutil
279+
for task in job.job_task_list:
280+
shutil.rmtree(os.path.join(self.machine.context.local_root, task.task_work_path), ignore_errors=True)
281+
self.belonging_tasks = [task for task in self.belonging_tasks if task not in job.job_task_list]
282+
255283
def check_all_finished(self):
256284
"""check whether all the jobs in the submission.
257285
@@ -559,11 +587,12 @@ def handle_unexpected_job_state(self):
559587
if ( self.fail_count ) > 0 and ( self.fail_count % 3 == 0 ) :
560588
raise RuntimeError(f"job:{self.job_hash} {self.job_id} failed {self.fail_count} times.job_detail:{self}")
561589
self.submit_job()
562-
dlog.info("job:{job_hash} re-submit after terminated; new job_id is {job_id}".format(job_hash=self.job_hash, job_id=self.job_id))
563-
time.sleep(0.2)
564-
self.get_job_state()
565-
dlog.info(f"job:{self.job_hash} job_id:{self.job_id} after re-submitting; the state now is {repr(self.job_state)}")
566-
self.handle_unexpected_job_state()
590+
if self.job_state != JobStatus.unsubmitted:
591+
dlog.info("job:{job_hash} re-submit after terminated; new job_id is {job_id}".format(job_hash=self.job_hash, job_id=self.job_id))
592+
time.sleep(0.2)
593+
self.get_job_state()
594+
dlog.info(f"job:{self.job_hash} job_id:{self.job_id} after re-submitting; the state now is {repr(self.job_state)}")
595+
self.handle_unexpected_job_state()
567596

568597
if job_state == JobStatus.unsubmitted:
569598
dlog.debug(f"job: {self.job_hash} unsubmitted; submit it")
@@ -610,8 +639,8 @@ def register_job_id(self, job_id):
610639

611640
def submit_job(self):
612641
job_id = self.machine.do_submit(self)
642+
self.register_job_id(job_id)
613643
if job_id:
614-
self.register_job_id(job_id)
615644
self.job_state = JobStatus.waiting
616645
else:
617646
self.job_state = JobStatus.unsubmitted
@@ -644,6 +673,8 @@ class Resources(object):
644673
If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS.
645674
If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task.
646675
Usually, this option will be used with Task.task_need_resources variable simultaneously.
676+
ratio_unfinished : float
677+
The ratio of `jobs` that can be unfinished.
647678
para_deg : int
648679
Decide how many tasks will be run in parallel.
649680
Usually run with `strategy['if_cuda_multi_devices']`
@@ -695,12 +726,17 @@ def __init__(self,
695726
# if self.gpu_per_node > 1:
696727
# self.in_para_task_num = 0
697728

729+
if 'if_cuda_multi_devices' not in self.strategy:
730+
self.strategy['if_cuda_multi_devices'] = default_strategy.get('if_cuda_multi_devices')
731+
if 'ratio_unfinished' not in self.strategy:
732+
self.strategy['ratio_unfinished'] = default_strategy.get('ratio_unfinished')
698733
if self.strategy['if_cuda_multi_devices'] is True:
699734
if gpu_per_node < 1:
700735
raise RuntimeError("gpu_per_node can not be smaller than 1 when if_cuda_multi_devices is True")
701736
if number_node != 1:
702737
raise RuntimeError("number_node must be 1 when if_cuda_multi_devices is True")
703-
738+
if self.strategy['ratio_unfinished'] >= 1.0:
739+
raise RuntimeError("ratio_unfinished must be smaller than 1.0")
704740
def __eq__(self, other):
705741
return self.serialize() == other.serialize()
706742

@@ -731,7 +767,6 @@ def deserialize(cls, resources_dict):
731767
gpu_per_node=resources_dict.get('gpu_per_node', 0),
732768
queue_name=resources_dict.get('queue_name', ''),
733769
group_size=resources_dict['group_size'],
734-
735770
custom_flags=resources_dict.get('custom_flags', []),
736771
strategy=resources_dict.get('strategy', default_strategy),
737772
para_deg=resources_dict.get('para_deg', 1),
@@ -776,7 +811,8 @@ def arginfo():
776811
doc_wait_time = 'The waitting time in second after a single `task` submitted'
777812

778813
strategy_args = [
779-
Argument("if_cuda_multi_devices", bool, optional=True, default=True)
814+
Argument("if_cuda_multi_devices", bool, optional=True, default=True),
815+
Argument("ratio_unfinished", float, optional=True, default=0.0)
780816
]
781817
doc_strategy = 'strategies we use to generation job submitting scripts.'
782818
strategy_format = Argument("strategy", dict, strategy_args, optional=True, doc=doc_strategy)

tests/sample_class.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def get_sample_resources_dict(cls):
4040
'queue_name':'T4_4_15',
4141
'group_size':2,
4242
'custom_flags':[],
43-
'strategy':{'if_cuda_multi_devices': False},
43+
'strategy':{'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0},
4444
'para_deg':1,
4545
'module_purge':False,
4646
'module_unload_list':[],

0 commit comments

Comments
 (0)