Skip to content

Commit c3fac38

Browse files
committed
add job group id
1 parent c172c38 commit c3fac38

File tree

2 files changed

+61
-20
lines changed

2 files changed

+61
-20
lines changed

dpdispatcher/dp_cloud_server.py

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class DpCloudServer(Machine):
1313
def __init__(self, context):
1414
self.context = context
1515
self.input_data = context.remote_profile['input_data'].copy()
16+
self.api_version = self.input_data.get('api_version', 1)
1617

1718
def gen_script(self, job):
1819
shell_script = super(DpCloudServer, self).gen_script(job)
@@ -42,30 +43,53 @@ def do_submit(self, job):
4243
input_data['job_resources'] = job_resources
4344
input_data['command'] = f"bash {job.script_file_name}"
4445

45-
46-
job_id = api.job_create(
47-
job_type=input_data['job_type'],
48-
oss_path=input_data['job_resources'],
49-
input_data=input_data,
50-
program_id=self.context.remote_profile.get('program_id', None)
51-
)
52-
53-
job.job_id = job_id
46+
job_id = None
47+
if self.api_version == 2:
48+
job_id, group_id = api.job_create_v2(
49+
job_type=input_data['job_type'],
50+
oss_path=input_data['job_resources'],
51+
input_data=input_data,
52+
program_id=self.context.remote_profile.get('program_id', None)
53+
)
54+
self.input_data['job_group_id'] = group_id
55+
job.job_id = str(job_id) + ':job_group_id' + str(group_id)
56+
job_id = job.job_id
57+
else:
58+
job_id = api.job_create(
59+
job_type=input_data['job_type'],
60+
oss_path=input_data['job_resources'],
61+
input_data=input_data,
62+
program_id=self.context.remote_profile.get('program_id', None)
63+
)
5464
job.job_state = JobStatus.waiting
5565
return job_id
5666

5767
def check_status(self, job):
5868
if job.job_id == '':
5969
return JobStatus.unsubmitted
70+
job_id = job.job_id
71+
if type(job.job_id) is str and ':job_group_id' in job.job_id:
72+
ids = job.job_id.split(":job_group_id")
73+
job.job_id ,self.input_data["job_group_id"] = int(ids[0]), int(ids[1])
74+
self.api_version = 2
75+
job_id = job.job_id
6076
dlog.debug(f"debug: check_status; job.job_id:{job.job_id}; job.job_hash:{job.job_hash}")
61-
62-
check_return = api.get_tasks(job.job_id)
77+
check_return = None
78+
if self.api_version == 2:
79+
check_return = api.get_tasks_v2(job_id,self.input_data.get('job_group_id'))
80+
else:
81+
check_return = api.get_tasks(job_id)
6382
try:
6483
dp_job_status = check_return[0]["status"]
6584
except IndexError as e:
66-
dlog.error(f"cannot find job information in check_return. job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
85+
dlog.error(f"cannot find job information in check_return. job {job_id}. check_return:{check_return}; retry one more time after 60 seconds")
6786
time.sleep(60)
68-
retry_return = api.get_tasks(job.job_id)
87+
retry_return = None
88+
retry_return = api.get_tasks(job_id)
89+
if self.api_version == 2:
90+
retry_return = api.get_tasks_v2(job_id,self.input_data.get('job_group_id'))
91+
else:
92+
retry_return = api.get_tasks(job_id)
6993
try:
7094
dp_job_status = retry_return[0]["status"]
7195
except IndexError as e:
@@ -91,8 +115,13 @@ def map_dp_job_state(status):
91115
-1:JobStatus.terminated,
92116
0:JobStatus.waiting,
93117
1:JobStatus.running,
94-
2:JobStatus.finished
118+
2:JobStatus.finished,
119+
3:JobStatus.waiting,
120+
4:JobStatus.running,
121+
5:JobStatus.terminated
95122
}
123+
if status not in map_dict:
124+
return JobStatus.unknown
96125
return map_dict[status]
97126

98127

dpdispatcher/dpcloudserver/api.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from .config import HTTP_TIME_OUT, API_HOST
1616
token = ''
1717
group_id = None
18+
1819
def get(url, params):
1920
global token
2021
headers = {'Authorization': "jwt " + token}
@@ -108,9 +109,6 @@ def upload(oss_task_zip, zip_task_file, endpoint, bucket_name):
108109

109110

110111
def job_create(job_type, oss_path, input_data, program_id=None):
111-
api_version = input_data.get('api_version',1)
112-
if api_version == 2:
113-
return job_create_v2(job_type,oss_path,input_data, program_id=program_id)
114112
post_data = {
115113
'job_type': job_type,
116114
'oss_path': oss_path,
@@ -137,14 +135,14 @@ def job_create_v2(job_type, oss_path, input_data, program_id=None):
137135
post_data["job_group_id"] = group_id
138136
if input_data.get('job_name') is not None:
139137
post_data["job_name"] = input_data.get('job_name')
138+
if input_data.get('rerun') is not None:
139+
post_data["rerun"] = input_data.get('rerun')
140140
if input_data.get('image_name') is not None:
141141
post_data["image_name"] = input_data.get('image_name')
142142
if input_data.get('disk_size') is not None:
143143
post_data["disk_size"] = input_data.get('disk_size')
144144
if input_data.get('scass_type') is not None:
145145
post_data["scass_type"] = input_data.get('scass_type')
146-
if input_data.get('scass_type') is not None:
147-
post_data["scass_type"] = input_data.get('scass_type')
148146
if input_data.get('instance_group_id') is not None:
149147
post_data["instance_group_id"] = input_data.get('instance_group_id')
150148
if input_data.get('command') is not None:
@@ -163,7 +161,7 @@ def job_create_v2(job_type, oss_path, input_data, program_id=None):
163161
post_data["on_demand"] = input_data.get('on_demand')
164162
ret = post('/data/v2/insert_job', post_data)
165163
group_id = ret.get('job_group_id')
166-
return ret['job_id']
164+
return ret['job_id'], group_id
167165

168166
def get_jobs(page=1, per_page=10):
169167
ret = get(
@@ -186,5 +184,19 @@ def get_tasks(job_id, page=1, per_page=10):
186184
)
187185
return ret['items']
188186

187+
188+
def get_tasks_v2(job_id, group_id, page=1, per_page=10):
189+
ret = get(
190+
f'data/job/{group_id}/tasks',
191+
{
192+
'page': page,
193+
'per_page': per_page,
194+
}
195+
)
196+
for each in ret['items']:
197+
if job_id == each["task_id"]:
198+
return [each]
199+
return []
200+
189201
#%%
190202

0 commit comments

Comments
 (0)