Skip to content

Commit 3841f7e

Browse files
authored
Merge pull request #136 from KZHIWEI/master
Machine support Lebesgue V2 API
2 parents f07446e + f63802c commit 3841f7e

File tree

3 files changed

+92
-19
lines changed

3 files changed

+92
-19
lines changed

dpdispatcher/dp_cloud_server.py

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ class DpCloudServer(Machine):
1313
def __init__(self, context):
1414
self.context = context
1515
self.input_data = context.remote_profile['input_data'].copy()
16+
self.api_version = self.input_data.get('api_version', 1)
17+
self.grouped = self.input_data.get('grouped', False)
18+
self.group_id = None
1619

1720
def gen_script(self, job):
1821
shell_script = super(DpCloudServer, self).gen_script(job)
@@ -23,10 +26,10 @@ def gen_script_header(self, job):
2326
return shell_script_header
2427

2528
def gen_local_script(self, job):
26-
script_str = self.gen_script(job)
29+
script_str = self.gen_script(job)
2730
script_file_name = job.script_file_name
2831
self.context.write_local_file(
29-
fname=script_file_name,
32+
fname=script_file_name,
3033
write_str=script_str
3134
)
3235
return script_file_name
@@ -42,30 +45,58 @@ def do_submit(self, job):
4245
input_data['job_resources'] = job_resources
4346
input_data['command'] = f"bash {job.script_file_name}"
4447

45-
46-
job_id = api.job_create(
47-
job_type=input_data['job_type'],
48-
oss_path=input_data['job_resources'],
49-
input_data=input_data,
50-
program_id=self.context.remote_profile.get('program_id', None)
51-
)
52-
53-
job.job_id = job_id
48+
job_id = None
49+
if self.api_version == 2:
50+
job_id, group_id = api.job_create_v2(
51+
job_type=input_data['job_type'],
52+
oss_path=input_data['job_resources'],
53+
input_data=input_data,
54+
program_id=self.context.remote_profile.get('program_id', None),
55+
group_id=self.group_id
56+
)
57+
if self.grouped:
58+
self.group_id = group_id
59+
job.job_id = str(job_id) + ':job_group_id:' + str(group_id)
60+
job_id = job.job_id
61+
else:
62+
job_id = api.job_create(
63+
job_type=input_data['job_type'],
64+
oss_path=input_data['job_resources'],
65+
input_data=input_data,
66+
program_id=self.context.remote_profile.get('program_id', None)
67+
)
5468
job.job_state = JobStatus.waiting
5569
return job_id
5670

5771
def check_status(self, job):
5872
if job.job_id == '':
5973
return JobStatus.unsubmitted
60-
dlog.debug(f"debug: check_status; job.job_id:{job.job_id}; job.job_hash:{job.job_hash}")
61-
62-
check_return = api.get_tasks(job.job_id)
74+
job_id = job.job_id
75+
group_id = None
76+
if isinstance(job.job_id, str) and ':job_group_id:' in job.job_id:
77+
group_id = None
78+
ids = job.job_id.split(":job_group_id:")
79+
job_id, group_id = int(ids[0]), int(ids[1])
80+
if self.input_data.get('grouped') and ':job_group_id:' not in self.input_data:
81+
self.group_id = group_id
82+
self.api_version = 2
83+
dlog.debug(f"debug: check_status; job.job_id:{job_id}; job.job_hash:{job.job_hash}")
84+
check_return = None
85+
# print("api",self.api_version,self.input_data.get('job_group_id'),job.job_id)
86+
if self.api_version == 2:
87+
check_return = api.get_tasks_v2(job_id,group_id)
88+
else:
89+
check_return = api.get_tasks(job_id)
6390
try:
6491
dp_job_status = check_return[0]["status"]
6592
except IndexError as e:
6693
dlog.error(f"cannot find job information in check_return. job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
6794
time.sleep(60)
68-
retry_return = api.get_tasks(job.job_id)
95+
retry_return = None
96+
if self.api_version == 2:
97+
retry_return = api.get_tasks_v2(job_id,group_id)
98+
else:
99+
retry_return = api.get_tasks(job_id)
69100
try:
70101
dp_job_status = retry_return[0]["status"]
71102
except IndexError as e:
@@ -91,8 +122,13 @@ def map_dp_job_state(status):
91122
-1:JobStatus.terminated,
92123
0:JobStatus.waiting,
93124
1:JobStatus.running,
94-
2:JobStatus.finished
125+
2:JobStatus.finished,
126+
3:JobStatus.waiting,
127+
4:JobStatus.running,
128+
5:JobStatus.terminated
95129
}
130+
if status not in map_dict:
131+
return JobStatus.unknown
96132
return map_dict[status]
97133

98134

dpdispatcher/dp_cloud_server_context.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22
# coding: utf-8
3-
#%%
3+
# %%
44
from dpdispatcher.base_context import BaseContext
55
import os
66
# from dpdispatcher import dlog
@@ -55,7 +55,6 @@ def bind_submission(self, submission):
5555
self.submission_hash = submission.submission_hash
5656

5757
self.machine = submission.machine
58-
5958

6059
# def zip_files(self, submission):
6160
# file_uuid = uuid.uuid1().hex
@@ -91,7 +90,6 @@ def upload(self, submission):
9190
zip_task_file,
9291
file_list=upload_file_list
9392
)
94-
9593
result = api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
9694
return result
9795
# return oss_task_zip

dpdispatcher/dpcloudserver/api.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from .retcode import RETCODE
1515
from .config import HTTP_TIME_OUT, API_HOST
1616
token = ''
17+
1718
def get(url, params):
1819
global token
1920
headers = {'Authorization': "jwt " + token}
@@ -118,6 +119,28 @@ def job_create(job_type, oss_path, input_data, program_id=None):
118119
return ret['job_id']
119120

120121

122+
def job_create_v2(job_type, oss_path, input_data, program_id=None, group_id=None):
123+
post_data = {
124+
'job_type': job_type,
125+
'oss_path': oss_path,
126+
}
127+
if program_id is not None:
128+
post_data["program_id"] = program_id
129+
if group_id is not None:
130+
post_data["job_group_id"] = group_id
131+
if input_data.get('command') is not None:
132+
post_data["cmd"] = input_data.get('command')
133+
if input_data.get('backward_files') is not None:
134+
post_data["out_files"] = input_data.get('backward_files')
135+
input_keys = ['job_group_id', 'job_name', 'rerun', 'image_name', 'disk_size', 'scass_type',
136+
'instance_group_id', 'log_file', 'platform', 'region', 'zone', 'on_demand']
137+
for key in input_keys:
138+
if key in input_data:
139+
post_data[key] = input_data[key]
140+
ret = post('/data/v2/insert_job', post_data)
141+
group_id = ret.get('job_group_id')
142+
return ret['job_id'], group_id
143+
121144
def get_jobs(page=1, per_page=10):
122145
ret = get(
123146
'/data/jobs',
@@ -139,5 +162,21 @@ def get_tasks(job_id, page=1, per_page=10):
139162
)
140163
return ret['items']
141164

165+
166+
def get_tasks_v2(job_id, group_id, page=1, per_page=10):
167+
ret = get(
168+
f'data/job/{group_id}/tasks',
169+
{
170+
'page': page,
171+
'per_page': per_page,
172+
}
173+
)
174+
for each in ret['items']:
175+
if job_id == each["task_id"]:
176+
return [each]
177+
if len(ret['items']) != 0:
178+
return get_tasks_v2(job_id, group_id, page=page+1)
179+
return []
180+
142181
#%%
143182

0 commit comments

Comments
 (0)