Skip to content

Commit b80d4cf

Browse files
authored
automatically switch to rsync backend (#170)
1 parent 2807f2e commit b80d4cf

File tree

2 files changed

+60
-8
lines changed

2 files changed

+60
-8
lines changed

dpdispatcher/ssh_context.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
from dpdispatcher.base_context import BaseContext
55
import os, paramiko, tarfile, time
66
import uuid
7+
import shutil
8+
from functools import lru_cache
79
from glob import glob
810
from dpdispatcher import dlog
911
from dargs.dargs import Argument
1012
from typing import List
1113
import pathlib
1214
# from dpdispatcher.submission import Machine
13-
from dpdispatcher.utils import get_sha256, generate_totp
15+
from dpdispatcher.utils import get_sha256, generate_totp, rsync
1416

1517
class SSHSession (object):
1618
def __init__(self,
@@ -175,6 +177,27 @@ def arginfo():
175177
ssh_remote_profile_format = Argument("ssh_session", dict, ssh_remote_profile_args)
176178
return ssh_remote_profile_format
177179

180+
def put(self, from_f, to_f):
181+
if self.rsync_available:
182+
return rsync(from_f, self.remote + ":" + to_f)
183+
return self.sftp.put(from_f, to_f)
184+
185+
def get(self, from_f, to_f):
186+
if self.rsync_available:
187+
return rsync(self.remote + ":" + from_f, to_f)
188+
return self.sftp.get(from_f, to_f)
189+
190+
@property
191+
@lru_cache(maxsize=None)
192+
def rsync_available(self) -> bool:
193+
return (shutil.which("rsync") is not None and self.password is None
194+
and self.port == 22 and self.key_filename is None
195+
and self.passphrase is None)
196+
197+
@property
198+
def remote(self) -> str:
199+
return "%s@%s" % (self.username, self.hostname)
200+
178201

179202
class SSHContext(BaseContext):
180203
def __init__ (self,
@@ -519,7 +542,7 @@ def _put_files(self,
519542
from_f = pathlib.PurePath(os.path.join(self.local_root, of)).as_posix()
520543
to_f = pathlib.PurePath(os.path.join(self.remote_root, of)).as_posix()
521544
try:
522-
self.sftp.put(from_f, to_f)
545+
self.ssh_session.put(from_f, to_f)
523546
except FileNotFoundError:
524547
raise FileNotFoundError("from %s to %s @ %s : %s Error!"%(from_f, self.ssh_session.username, self.ssh_session.hostname, to_f))
525548
# remote extract
@@ -547,7 +570,7 @@ def _get_files(self,
547570
to_f = pathlib.PurePath(os.path.join(self.local_root, of)).as_posix()
548571
if os.path.isfile(to_f) :
549572
os.remove(to_f)
550-
self.sftp.get(from_f, to_f)
573+
self.ssh_session.get(from_f, to_f)
551574
# extract
552575
cwd = os.getcwd()
553576
os.chdir(self.local_root)

dpdispatcher/utils.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,37 @@ def generate_totp(secret: str, period: int=30, token_length: int=6) -> int:
6363
token = base % (10**token_length)
6464
return str(token).zfill(token_length)
6565

66-
def run_cmd_with_all_output(cmd):
67-
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
68-
out, err = proc.communicate()
69-
ret = proc.returncode
70-
return (ret, out, err)
66+
67+
def run_cmd_with_all_output(cmd, shell=True):
68+
with subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
69+
out, err = proc.communicate()
70+
ret = proc.returncode
71+
return (ret, out, err)
72+
73+
74+
def rsync(from_file: str, to_file: str):
75+
"""Call rsync to transfer files.
76+
77+
Parameters
78+
----------
79+
from_file: str
80+
SRC
81+
to_file: str
82+
DEST
83+
84+
Raises
85+
------
86+
RuntimeError
87+
when return code is not 0
88+
"""
89+
cmd = [
90+
'rsync',
91+
# -a: archieve
92+
# -z: compress
93+
'-az',
94+
from_file,
95+
to_file,
96+
]
97+
ret, out, err = run_cmd_with_all_output(cmd, shell=False)
98+
if ret != 0:
99+
raise RuntimeError("Failed to run %s: %s" %(cmd, err))

0 commit comments

Comments
 (0)