Skip to content

Commit a53576c

Browse files
committed
fix uploading to lebesgue
1 parent 8c0b4d9 commit a53576c

File tree

3 files changed

+31
-6
lines changed

3 files changed

+31
-6
lines changed

dpdispatcher/dp_cloud_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,13 @@ def check_status(self, job):
119119
# print("api",self.api_version,self.input_data.get('job_group_id'),job.job_id)
120120
check_return = self.api.get_tasks(job_id,group_id)
121121
try:
122-
dp_job_status = check_return[0]["status"]
122+
dp_job_status = check_return["status"]
123123
except IndexError as e:
124124
dlog.error(f"cannot find job information in check_return. job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
125125
time.sleep(60)
126126
retry_return = self.api.get_tasks(job_id, group_id)
127127
try:
128-
dp_job_status = retry_return[0]["status"]
128+
dp_job_status = retry_return["status"]
129129
except IndexError as e:
130130
raise RuntimeError(f"cannot find job information in dpcloudserver's database for job {job.job_id} {check_return} {retry_return}")
131131

dpdispatcher/dp_cloud_server_context.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,14 @@ def upload(self, submission):
9191
# zip_path = "/home/felix/workplace/22_dpdispatcher/dpdispatcher-yfb/dpdispatcher/dpcloudserver/t.txt"
9292
# zip_path = self.local_root
9393
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
94-
for job in tqdm.tqdm(submission.belonging_jobs, desc="Uploading to Lebesgue", bar_format=bar_format):
94+
job_to_be_uploaded = []
95+
result = None
96+
for job in submission.belonging_jobs:
97+
if not self.api.check_job_has_uploaded(job.job_id):
98+
job_to_be_uploaded.append(job)
99+
if len(job_to_be_uploaded) == 0:
100+
return result
101+
for job in tqdm.tqdm(job_to_be_uploaded, desc="Uploading to Lebesgue", bar_format=bar_format):
95102
self.machine.gen_local_script(job)
96103
zip_filename = job.job_hash + '.zip'
97104
oss_task_zip = self._gen_oss_path(job, zip_filename)
@@ -143,7 +150,7 @@ def download(self, submission):
143150
job_hash = job_hashs[each['task_id']]
144151
job_infos[job_hash] = each
145152
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
146-
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Downloading to Lebesgue", bar_format=bar_format):
153+
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Downloading from Lebesgue", bar_format=bar_format):
147154
result_filename = job_hash + '_back.zip'
148155
target_result_zip = os.path.join(self.local_root, result_filename)
149156
self.api.download_from_url(info['result_url'], target_result_zip)

dpdispatcher/dpcloudserver/api.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,10 @@ def get_tasks(self, job_id, group_id, page=1, per_page=10):
210210
)
211211
for each in ret['items']:
212212
if job_id == each["task_id"]:
213-
return [each]
213+
return each
214214
if len(ret['items']) != 0:
215215
return self.get_tasks(job_id, group_id, page=page + 1)
216-
return []
216+
return None
217217

218218
def get_tasks_list(self, group_id, per_page=30):
219219
result = []
@@ -233,4 +233,22 @@ def get_tasks_list(self, group_id, per_page=30):
233233
page += 1
234234
return result
235235

236+
def check_job_has_uploaded(self, job_id):
237+
try:
238+
if not job_id:
239+
return False
240+
if 'job_group_id' in job_id:
241+
ids = job_id.split(":job_group_id:")
242+
job_id, _ = int(ids[0]), int(ids[1])
243+
ret = self.get(f'data/job/{job_id}', {})
244+
if len(ret) == 0:
245+
return False
246+
if ret.get('input_data'):
247+
return True
248+
else:
249+
return False
250+
except ValueError as e:
251+
dlog.error(e)
252+
return False
253+
236254
# %%

0 commit comments

Comments
 (0)