Skip to content

Commit 28f2730

Browse files
committed
improve upload
1 parent 67f934d commit 28f2730

File tree

3 files changed

+58
-11
lines changed

3 files changed

+58
-11
lines changed

dpdispatcher/dp_cloud_server.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import shutil
2+
13
from dpdispatcher.JobStatus import JobStatus
24
from dpdispatcher import dlog
5+
from dpdispatcher.dpcloudserver import zip_file
36
from dpdispatcher.machine import Machine
47
from dpdispatcher.dpcloudserver.api import API
58
from dpdispatcher.dpcloudserver.config import ALI_OSS_BUCKET_URL
@@ -26,7 +29,7 @@ def __init__(self, context):
2629
username = context.remote_profile.get('username', None)
2730
password = context.remote_profile.get('password', None)
2831
if email is None and username is not None:
29-
raise dlog.exception("username is no longer support in current version, "
32+
dlog.exception("username is no longer support in current version, "
3033
"please consider use email instead of username.", DeprecationWarning)
3134
if email is None:
3235
raise ValueError("can not find email in remote_profile, please check your machine file.")
@@ -130,8 +133,27 @@ def check_status(self, job):
130133
raise RuntimeError(f"cannot find job information in dpcloudserver's database for job {job.job_id} {check_return} {retry_return}")
131134

132135
job_state = self.map_dp_job_state(dp_job_status)
136+
if job_state == JobStatus.finished:
137+
self._download_job(job)
133138
return job_state
134139

140+
141+
def _download_job(self, job):
142+
job_url = self.api.get_job_result_url(job.job_id)
143+
if not job_url:
144+
return
145+
job_hash = job.job_hash
146+
result_filename = job_hash + '_back.zip'
147+
target_result_zip = os.path.join(self.context.local_root, result_filename)
148+
self.api.download_from_url(job_url, target_result_zip)
149+
zip_file.unzip_file(target_result_zip, out_dir=self.context.local_root)
150+
try:
151+
os.makedirs(os.path.join(self.context.local_root, 'backup'), exist_ok=True)
152+
shutil.move(target_result_zip,
153+
os.path.join(self.context.local_root, 'backup', os.path.split(target_result_zip)[1]))
154+
except (OSError, shutil.Error) as e:
155+
dlog.exception("unable to backup file, " + str(e))
156+
135157
def check_finish_tag(self, job):
136158
job_tag_finished = job.job_hash + '_job_tag_finished'
137159
dlog.info('check if job finished: ',job.job_id, job_tag_finished)

dpdispatcher/dp_cloud_server_context.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ def upload(self, submission):
124124
)
125125
result = self.api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
126126
self._backup(self.local_root, upload_zip, keep_backup=self.remote_profile.get('keep_backup', True))
127-
print() # empty print because tqdm may not print a newline along with dlog
128127
return result
129128
# return oss_task_zip
130129
# api.upload(self.oss_task_dir, zip_task_file)
@@ -152,27 +151,37 @@ def download(self, submission):
152151
job_hash = job_hashs[each['task_id']]
153152
job_infos[job_hash] = each
154153
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
155-
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Downloading from Lebesgue", bar_format=bar_format):
154+
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Validating download file from Lebesgue", bar_format=bar_format):
156155
result_filename = job_hash + '_back.zip'
157156
target_result_zip = os.path.join(self.local_root, result_filename)
157+
if self._check_if_job_has_already_downloaded(target_result_zip, self.local_root):
158+
continue
158159
self.api.download_from_url(info['result_url'], target_result_zip)
159160
zip_file.unzip_file(target_result_zip, out_dir=self.local_root)
160161
self._backup(self.local_root, target_result_zip, keep_backup=self.remote_profile.get('keep_backup', True))
161-
print() # empty print because tqdm may not print a newline along with dlog
162+
self._clean_backup(self.local_root, keep_backup=self.remote_profile.get('keep_backup', True))
162163
return True
163164

165+
def _check_if_job_has_already_downloaded(self, target, local_root):
166+
backup_file_location = os.path.join(local_root, 'backup', os.path.split(target)[1])
167+
if os.path.exists(backup_file_location):
168+
return False
169+
else:
170+
return True
171+
164172
def _backup(self, local_root, target, keep_backup=True):
165173
try:
166-
if keep_backup:
167-
# move to backup directory
168-
os.makedirs(os.path.join(local_root, 'backup'), exist_ok=True)
169-
shutil.move(target,
170-
os.path.join(local_root, 'backup', os.path.split(target)[1]))
171-
else:
172-
os.remove(target)
174+
# move to backup directory
175+
os.makedirs(os.path.join(local_root, 'backup'), exist_ok=True)
176+
shutil.move(target,
177+
os.path.join(local_root, 'backup', os.path.split(target)[1]))
173178
except (OSError, shutil.Error) as e:
174179
dlog.exception("unable to backup file, " + str(e))
175180

181+
def _clean_backup(self, local_root, keep_backup=True):
182+
if not keep_backup:
183+
os.removedirs(os.path.join(local_root, 'backup'))
184+
176185
def write_file(self, fname, write_str):
177186
result = self.write_home_file(fname, write_str)
178187
return result

dpdispatcher/dpcloudserver/api.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,4 +251,20 @@ def check_job_has_uploaded(self, job_id):
251251
dlog.error(e)
252252
return False
253253

254+
def get_job_result_url(self, job_id):
255+
try:
256+
if not job_id:
257+
return None
258+
if 'job_group_id' in job_id:
259+
ids = job_id.split(":job_group_id:")
260+
job_id, _ = int(ids[0]), int(ids[1])
261+
ret = self.get(f'data/job/{job_id}', {})
262+
if 'result_url' in ret and len(ret['result_url']) != 0:
263+
return ret.get('result_url')
264+
else:
265+
return None
266+
except ValueError as e:
267+
dlog.error(e)
268+
return None
269+
254270
# %%

0 commit comments

Comments
 (0)