Skip to content

Commit 14628e0

Browse files
committed
remove version 1 and add tqdm
1 parent 8990a3b commit 14628e0

File tree

5 files changed

+32
-68
lines changed

5 files changed

+32
-68
lines changed

dpdispatcher/dp_cloud_server.py

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ def __init__(self, context):
1818
self.input_data = context.remote_profile['input_data'].copy()
1919
self.api_version = 2
2020
if 'api_version' in self.input_data:
21-
self.api_version = self.input_data.get('api_version')
21+
self.api_version = self.input_data.get('api_version', 2)
2222
if 'lebesgue_version' in self.input_data:
23-
self.api_version = self.input_data.get('lebesgue_version')
23+
self.api_version = self.input_data.get('lebesgue_version', 2)
2424
self.grouped = self.input_data.get('grouped', False)
2525
email = context.remote_profile.get("email", None)
2626
username = context.remote_profile.get('username', None)
@@ -33,7 +33,7 @@ def __init__(self, context):
3333
if password is None:
3434
raise ValueError("can not find password in remote_profile, please check your machine file.")
3535
if self.api_version == 1:
36-
warnings.warn('api version 1 is deprecated and will be removed in a future version. Use version 2 instead.', DeprecationWarning)
36+
warnings.warn('api version 1 is deprecated. Use version 2 instead.', DeprecationWarning)
3737
self.api = API(email, password)
3838
self.group_id = None
3939

@@ -88,26 +88,17 @@ def do_submit(self, job):
8888
# input_data['backward_files'] = self._gen_backward_files_list(job)
8989
if self.context.remote_profile.get('program_id') is None:
9090
warnings.warn('program_id will be compulsory in the future.')
91-
job_id = None
92-
if self.api_version == 2:
93-
job_id, group_id = self.api.job_create_v2(
94-
job_type=input_data['job_type'],
95-
oss_path=input_data['job_resources'],
96-
input_data=input_data,
97-
program_id=self.context.remote_profile.get('program_id', None),
98-
group_id=self.group_id
99-
)
100-
if self.grouped:
101-
self.group_id = group_id
102-
job.job_id = str(job_id) + ':job_group_id:' + str(group_id)
103-
job_id = job.job_id
104-
else:
105-
job_id = self.api.job_create(
106-
job_type=input_data['job_type'],
107-
oss_path=input_data['job_resources'],
108-
input_data=input_data,
109-
program_id=self.context.remote_profile.get('program_id', None)
110-
)
91+
job_id, group_id = self.api.job_create(
92+
job_type=input_data['job_type'],
93+
oss_path=input_data['job_resources'],
94+
input_data=input_data,
95+
program_id=self.context.remote_profile.get('program_id', None),
96+
group_id=self.group_id
97+
)
98+
if self.grouped:
99+
self.group_id = group_id
100+
job.job_id = str(job_id) + ':job_group_id:' + str(group_id)
101+
job_id = job.job_id
111102
job.job_state = JobStatus.waiting
112103
return job_id
113104

@@ -126,20 +117,13 @@ def check_status(self, job):
126117
dlog.debug(f"debug: check_status; job.job_id:{job_id}; job.job_hash:{job.job_hash}")
127118
check_return = None
128119
# print("api",self.api_version,self.input_data.get('job_group_id'),job.job_id)
129-
if self.api_version == 2:
130-
check_return = self.api.get_tasks_v2(job_id,group_id)
131-
else:
132-
check_return = self.api.get_tasks(job_id)
120+
check_return = self.api.get_tasks(job_id,group_id)
133121
try:
134122
dp_job_status = check_return[0]["status"]
135123
except IndexError as e:
136124
dlog.error(f"cannot find job information in check_return. job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
137125
time.sleep(60)
138-
retry_return = None
139-
if self.api_version == 2:
140-
retry_return = self.api.get_tasks_v2(job_id, group_id)
141-
else:
142-
retry_return = self.api.get_tasks(job_id)
126+
retry_return = self.api.get_tasks(job_id, group_id)
143127
try:
144128
dp_job_status = retry_return[0]["status"]
145129
except IndexError as e:

dpdispatcher/dp_cloud_server_context.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .dpcloudserver.api import API
1414
from .dpcloudserver import zip_file
1515
import shutil
16+
import tqdm
1617
# from zip_file import zip_files
1718
DP_CLOUD_SERVER_HOME_DIR = os.path.join(
1819
os.path.expanduser('~'),
@@ -89,8 +90,8 @@ def upload(self, submission):
8990

9091
# zip_path = "/home/felix/workplace/22_dpdispatcher/dpdispatcher-yfb/dpdispatcher/dpcloudserver/t.txt"
9192
# zip_path = self.local_root
92-
93-
for job in submission.belonging_jobs:
93+
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
94+
for job in tqdm.tqdm(submission.belonging_jobs, desc="Uploading to Lebesgue", bar_format=bar_format):
9495
self.machine.gen_local_script(job)
9596
zip_filename = job.job_hash + '.zip'
9697
oss_task_zip = self._gen_oss_path(job, zip_filename)
@@ -124,15 +125,12 @@ def download(self, submission):
124125
group_id = None
125126
job_infos = {}
126127
for job in jobs:
127-
if isinstance(job.job_id, str) and ':job_group_id:' in job.job_id:
128-
ids = job.job_id.split(":job_group_id:")
129-
jid, gid = int(ids[0]), int(ids[1])
130-
job_hashs[jid] = job.job_hash
131-
group_id = gid
132-
else:
133-
job_infos[job.job_hash] = self.api.get_tasks(job.job_id)[0]
128+
ids = job.job_id.split(":job_group_id:")
129+
jid, gid = int(ids[0]), int(ids[1])
130+
job_hashs[jid] = job.job_hash
131+
group_id = gid
134132
if group_id is not None:
135-
job_result = self.api.get_tasks_v2_list(group_id)
133+
job_result = self.api.get_tasks_list(group_id)
136134
for each in job_result:
137135
if 'result_url' in each and each['result_url'] != '' and each['status'] == 2:
138136
job_hash = ''
@@ -143,7 +141,8 @@ def download(self, submission):
143141
else:
144142
job_hash = job_hashs[each['task_id']]
145143
job_infos[job_hash] = each
146-
for job_hash, info in job_infos.items():
144+
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
145+
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Downloading to Lebesgue", bar_format=bar_format):
147146
result_filename = job_hash + '_back.zip'
148147
target_result_zip = os.path.join(self.local_root, result_filename)
149148
self.api.download_from_url(info['result_url'], target_result_zip)

dpdispatcher/dpcloudserver/api.py

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -170,18 +170,8 @@ def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name):
170170
# print('debug:upload_result:', result, dir())
171171
return result
172172

173-
def job_create(self, job_type, oss_path, input_data, program_id=None):
174-
post_data = {
175-
'job_type': job_type,
176-
'oss_path': oss_path,
177-
'input_data': input_data,
178-
}
179-
if program_id is not None:
180-
post_data["program_id"] = program_id
181-
ret = self.post('/data/insert_job', post_data)
182-
return ret['job_id']
183173

184-
def job_create_v2(self, job_type, oss_path, input_data, program_id=None, group_id=None):
174+
def job_create(self, job_type, oss_path, input_data, program_id=None, group_id=None):
185175
post_data = {
186176
'job_type': job_type,
187177
'oss_path': oss_path,
@@ -210,17 +200,7 @@ def get_jobs(self, page=1, per_page=10):
210200
)
211201
return ret['items']
212202

213-
def get_tasks(self, job_id, page=1, per_page=10):
214-
ret = self.get(
215-
f'data/job/{job_id}/tasks',
216-
{
217-
'page': page,
218-
'per_page': per_page,
219-
}
220-
)
221-
return ret['items']
222-
223-
def get_tasks_v2(self, job_id, group_id, page=1, per_page=10):
203+
def get_tasks(self, job_id, group_id, page=1, per_page=10):
224204
ret = self.get(
225205
f'data/job/{group_id}/tasks',
226206
{
@@ -232,10 +212,10 @@ def get_tasks_v2(self, job_id, group_id, page=1, per_page=10):
232212
if job_id == each["task_id"]:
233213
return [each]
234214
if len(ret['items']) != 0:
235-
return self.get_tasks_v2(job_id, group_id, page=page + 1)
215+
return self.get_tasks(job_id, group_id, page=page + 1)
236216
return []
237217

238-
def get_tasks_v2_list(self, group_id, per_page=30):
218+
def get_tasks_list(self, group_id, per_page=30):
239219
result = []
240220
page = 0
241221
while True:

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
paramiko
22
dargs>=0.2.6
33
oss2
4+
tqdm

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
install_requires=install_requires,
4242
extras_require={
4343
'docs': ['sphinx', 'recommonmark', 'sphinx_rtd_theme>=1.0.0rc1', 'numpydoc'],
44-
"cloudserver": ["oss2"],
44+
"cloudserver": ["oss2", "tqdm"],
4545
":python_version<'3.7'": ["typing_extensions"],
4646
},
4747
entry_points={

0 commit comments

Comments
 (0)