Skip to content

Commit c8e9b20

Browse files
chore: make block_call as abstractmethod; merge block_checkcall (#484)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new `block_call` methods in various contexts to enhance error handling for unsupported operations. - Added `block_checkcall` method to allow for asynchronous command execution in certain contexts. - **Bug Fixes** - Enhanced error feedback when unsupported operations are attempted in several contexts. - **Chores** - Removed the `block_checkcall` method from multiple contexts, simplifying command execution and error management. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: Jinzhe Zeng <jinzhe.zeng@rutgers.edu> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent dcaff73 commit c8e9b20

File tree

7 files changed

+76
-68
lines changed

7 files changed

+76
-68
lines changed

dpdispatcher/base_context.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABCMeta, abstractmethod
2-
from typing import List, Tuple
2+
from typing import Any, List, Tuple
33

44
from dargs import Argument
55

@@ -73,6 +73,66 @@ def read_file(self, fname):
7373
def check_finish(self, proc):
7474
raise NotImplementedError("abstract method")
7575

76+
def block_checkcall(self, cmd, asynchronously=False) -> Tuple[Any, Any, Any]:
77+
"""Run command with arguments. Wait for command to complete.
78+
79+
Parameters
80+
----------
81+
cmd : str
82+
The command to run.
83+
asynchronously : bool, optional, default=False
84+
Run command asynchronously. If True, `nohup` will be used to run the command.
85+
86+
Returns
87+
-------
88+
stdin
89+
standard inout
90+
stdout
91+
standard output
92+
stderr
93+
standard error
94+
95+
Raises
96+
------
97+
RuntimeError
98+
when the return code is not zero
99+
"""
100+
if asynchronously:
101+
cmd = f"nohup {cmd} >/dev/null &"
102+
exit_status, stdin, stdout, stderr = self.block_call(cmd)
103+
if exit_status != 0:
104+
raise RuntimeError(
105+
"Get error code %d in calling %s with job: %s . message: %s"
106+
% (
107+
exit_status,
108+
cmd,
109+
self.submission.submission_hash,
110+
stderr.read().decode("utf-8"),
111+
)
112+
)
113+
return stdin, stdout, stderr
114+
115+
@abstractmethod
116+
def block_call(self, cmd) -> Tuple[int, Any, Any, Any]:
117+
"""Run command with arguments. Wait for command to complete.
118+
119+
Parameters
120+
----------
121+
cmd : str
122+
The command to run.
123+
124+
Returns
125+
-------
126+
exit_status
127+
exit code
128+
stdin
129+
standard inout
130+
stdout
131+
standard output
132+
stderr
133+
standard error
134+
"""
135+
76136
@classmethod
77137
def machine_arginfo(cls) -> Argument:
78138
"""Generate the machine arginfo.

dpdispatcher/contexts/dp_cloud_server_context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,11 @@ def machine_subfields(cls) -> List[Argument]:
335335
)
336336
]
337337

338+
def block_call(self, cmd):
339+
raise RuntimeError(
340+
"Unsupported method. You may use an unsupported combination of the machine and the context."
341+
)
342+
338343

339344
DpCloudServerContext = BohriumContext
340345
LebesgueContext = BohriumContext

dpdispatcher/contexts/hdfs_context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,8 @@ def write_file(self, fname, write_str):
244244

245245
def read_file(self, fname):
246246
return HDFS.read_hdfs_file(os.path.join(self.remote_root, fname))
247+
248+
def block_call(self, cmd):
249+
raise RuntimeError(
250+
"Unsupported method. You may use an unsupported combination of the machine and the context."
251+
)

dpdispatcher/contexts/lazy_local_context.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -112,23 +112,6 @@ def download(
112112
# else:
113113
# raise RuntimeError('do not find download file ' + fname)
114114

115-
def block_checkcall(self, cmd):
116-
# script_dir = os.path.join(self.local_root, self.submission.work_base)
117-
# os.chdir(script_dir)
118-
proc = sp.Popen(
119-
cmd, cwd=self.local_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
120-
)
121-
o, e = proc.communicate()
122-
stdout = SPRetObj(o)
123-
stderr = SPRetObj(e)
124-
code = proc.returncode
125-
if code != 0:
126-
raise RuntimeError(
127-
"Get error code %d in locally calling %s with job: %s ",
128-
(code, cmd, self.submission.submission_hash),
129-
)
130-
return None, stdout, stderr
131-
132115
def block_call(self, cmd):
133116
proc = sp.Popen(
134117
cmd, cwd=self.local_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE

dpdispatcher/contexts/local_context.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -288,21 +288,6 @@ def download(
288288
# no nothing in the case of linked files
289289
pass
290290

291-
def block_checkcall(self, cmd):
292-
proc = sp.Popen(
293-
cmd, cwd=self.remote_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
294-
)
295-
o, e = proc.communicate()
296-
stdout = SPRetObj(o)
297-
stderr = SPRetObj(e)
298-
code = proc.returncode
299-
if code != 0:
300-
raise RuntimeError(
301-
f"Get error code {code} in locally calling {cmd} with job: {self.submission.submission_hash}"
302-
f"\nStandard error: {stderr}"
303-
)
304-
return None, stdout, stderr
305-
306291
def block_call(self, cmd):
307292
proc = sp.Popen(
308293
cmd, cwd=self.remote_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE

dpdispatcher/contexts/openapi_context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,3 +258,8 @@ def _clean_backup(self, local_root, keep_backup=True):
258258
dir_to_be_removed = os.path.join(local_root, "backup")
259259
if os.path.exists(dir_to_be_removed):
260260
shutil.rmtree(dir_to_be_removed)
261+
262+
def block_call(self, cmd):
263+
raise RuntimeError(
264+
"Unsupported method. You may use an unsupported combination of the machine and the context."
265+
)

dpdispatcher/contexts/ssh_context.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -767,41 +767,6 @@ def download(
767767
tar_compress=self.remote_profile.get("tar_compress", None),
768768
)
769769

770-
def block_checkcall(self, cmd, asynchronously=False, stderr_whitelist=None):
771-
"""Run command with arguments. Wait for command to complete. If the return code
772-
was zero then return, otherwise raise RuntimeError.
773-
774-
Parameters
775-
----------
776-
cmd : str
777-
The command to run.
778-
asynchronously : bool, optional, default=False
779-
Run command asynchronously. If True, `nohup` will be used to run the command.
780-
stderr_whitelist : list of str, optional, default=None
781-
If not None, the stderr will be checked against the whitelist. If the stderr
782-
contains any of the strings in the whitelist, the command will be considered
783-
successful.
784-
"""
785-
assert self.remote_root is not None
786-
self.ssh_session.ensure_alive()
787-
if asynchronously:
788-
cmd = f"nohup {cmd} >/dev/null &"
789-
stdin, stdout, stderr = self.ssh_session.exec_command(
790-
(f"cd {shlex.quote(self.remote_root)} ;") + cmd
791-
)
792-
exit_status = stdout.channel.recv_exit_status()
793-
if exit_status != 0:
794-
raise RuntimeError(
795-
"Get error code %d in calling %s through ssh with job: %s . message: %s"
796-
% (
797-
exit_status,
798-
cmd,
799-
self.submission.submission_hash,
800-
stderr.read().decode("utf-8"),
801-
)
802-
)
803-
return stdin, stdout, stderr
804-
805770
def block_call(self, cmd):
806771
assert self.remote_root is not None
807772
self.ssh_session.ensure_alive()

0 commit comments

Comments
 (0)