Skip to content

Commit c3d32eb

Browse files
committed
support v2
1 parent c3fac38 commit c3d32eb

File tree

3 files changed

+24
-20
lines changed

3 files changed

+24
-20
lines changed

dpdispatcher/dp_cloud_server.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ def __init__(self, context):
1414
self.context = context
1515
self.input_data = context.remote_profile['input_data'].copy()
1616
self.api_version = self.input_data.get('api_version', 1)
17+
self.grouped = self.input_data.get('grouped', False)
18+
self.group_id = None
1719

1820
def gen_script(self, job):
1921
shell_script = super(DpCloudServer, self).gen_script(job)
@@ -24,10 +26,10 @@ def gen_script_header(self, job):
2426
return shell_script_header
2527

2628
def gen_local_script(self, job):
27-
script_str = self.gen_script(job)
29+
script_str = self.gen_script(job)
2830
script_file_name = job.script_file_name
2931
self.context.write_local_file(
30-
fname=script_file_name,
32+
fname=script_file_name,
3133
write_str=script_str
3234
)
3335
return script_file_name
@@ -49,9 +51,11 @@ def do_submit(self, job):
4951
job_type=input_data['job_type'],
5052
oss_path=input_data['job_resources'],
5153
input_data=input_data,
52-
program_id=self.context.remote_profile.get('program_id', None)
54+
program_id=self.context.remote_profile.get('program_id', None),
55+
group_id=self.group_id
5356
)
54-
self.input_data['job_group_id'] = group_id
57+
if self.grouped:
58+
self.group_id = group_id
5559
job.job_id = str(job_id) + ':job_group_id' + str(group_id)
5660
job_id = job.job_id
5761
else:
@@ -68,26 +72,29 @@ def check_status(self, job):
6872
if job.job_id == '':
6973
return JobStatus.unsubmitted
7074
job_id = job.job_id
71-
if type(job.job_id) is str and ':job_group_id' in job.job_id:
75+
group_id = None
76+
if isinstance(job.job_id,str) and ':job_group_id' in job.job_id:
77+
group_id = None
7278
ids = job.job_id.split(":job_group_id")
73-
job.job_id ,self.input_data["job_group_id"] = int(ids[0]), int(ids[1])
79+
job_id, group_id = int(ids[0]), int(ids[1])
80+
if self.input_data.get('grouped') and 'job_group_id' not in self.input_data:
81+
self.group_id = group_id
7482
self.api_version = 2
75-
job_id = job.job_id
76-
dlog.debug(f"debug: check_status; job.job_id:{job.job_id}; job.job_hash:{job.job_hash}")
83+
dlog.debug(f"debug: check_status; job.job_id:{job_id}; job.job_hash:{job.job_hash}")
7784
check_return = None
85+
# print("api",self.api_version,self.input_data.get('job_group_id'),job.job_id)
7886
if self.api_version == 2:
79-
check_return = api.get_tasks_v2(job_id,self.input_data.get('job_group_id'))
87+
check_return = api.get_tasks_v2(job_id,group_id)
8088
else:
8189
check_return = api.get_tasks(job_id)
8290
try:
8391
dp_job_status = check_return[0]["status"]
8492
except IndexError as e:
85-
dlog.error(f"cannot find job information in check_return. job {job_id}. check_return:{check_return}; retry one more time after 60 seconds")
93+
dlog.error(f"cannot find job information in check_return. job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
8694
time.sleep(60)
8795
retry_return = None
88-
retry_return = api.get_tasks(job_id)
8996
if self.api_version == 2:
90-
retry_return = api.get_tasks_v2(job_id,self.input_data.get('job_group_id'))
97+
retry_return = api.get_tasks_v2(job_id,group_id)
9198
else:
9299
retry_return = api.get_tasks(job_id)
93100
try:

dpdispatcher/dp_cloud_server_context.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22
# coding: utf-8
3-
#%%
3+
# %%
44
from dpdispatcher.base_context import BaseContext
55
import os
66
# from dpdispatcher import dlog
@@ -55,7 +55,6 @@ def bind_submission(self, submission):
5555
self.submission_hash = submission.submission_hash
5656

5757
self.machine = submission.machine
58-
5958

6059
# def zip_files(self, submission):
6160
# file_uuid = uuid.uuid1().hex
@@ -91,7 +90,6 @@ def upload(self, submission):
9190
zip_task_file,
9291
file_list=upload_file_list
9392
)
94-
9593
result = api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
9694
return result
9795
# return oss_task_zip

dpdispatcher/dpcloudserver/api.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from .retcode import RETCODE
1515
from .config import HTTP_TIME_OUT, API_HOST
1616
token = ''
17-
group_id = None
1817

1918
def get(url, params):
2019
global token
@@ -120,8 +119,7 @@ def job_create(job_type, oss_path, input_data, program_id=None):
120119
return ret['job_id']
121120

122121

123-
def job_create_v2(job_type, oss_path, input_data, program_id=None):
124-
global group_id
122+
def job_create_v2(job_type, oss_path, input_data, program_id=None, group_id=None):
125123
post_data = {
126124
'job_type': job_type,
127125
'oss_path': oss_path,
@@ -130,8 +128,7 @@ def job_create_v2(job_type, oss_path, input_data, program_id=None):
130128
post_data["program_id"] = program_id
131129
if input_data.get('job_group_id') is not None:
132130
post_data["job_group_id"] = input_data.get('job_group_id')
133-
elif input_data.get('grouped') is not None and input_data.get('grouped'):
134-
if group_id is not None:
131+
if group_id is not None:
135132
post_data["job_group_id"] = group_id
136133
if input_data.get('job_name') is not None:
137134
post_data["job_name"] = input_data.get('job_name')
@@ -196,6 +193,8 @@ def get_tasks_v2(job_id, group_id, page=1, per_page=10):
196193
for each in ret['items']:
197194
if job_id == each["task_id"]:
198195
return [each]
196+
if len(ret['items']) != 0:
197+
return get_tasks_v2(job_id, group_id, page=2)
199198
return []
200199

201200
#%%

0 commit comments

Comments
 (0)