33import shlex
44from dataclasses import dataclass
55from pathlib import Path
6+ from subprocess import CalledProcessError
67
78from archinstall .lib .disk .utils import get_lsblk_info
89
910from .exceptions import DiskError , SysCallError
10- from .general import SysCommand , SysCommandWorker , generate_password
11+ from .general import SysCommand , SysCommandWorker , generate_password , run
1112from .output import debug , info
1213
1314
@@ -58,29 +59,29 @@ def _password_bytes(self) -> bytes:
5859 else :
5960 return bytes (self .password , 'UTF-8' )
6061
61- def _get_key_file (self , key_file : Path | None = None ) -> Path :
62- if key_file :
63- return key_file
62+ def _get_passphrase_args (
63+ self ,
64+ key_file : Path | None = None
65+ ) -> tuple [list [str ], bytes | None ]:
66+ key_file = key_file or self .key_file
6467
65- if self . key_file :
66- return self . key_file
68+ if key_file :
69+ return [ '--key-file' , str ( key_file )], None
6770
68- default_key_file = Path (f'/tmp/{ self .luks_dev_path .name } .disk_pw' )
69- default_key_file .write_bytes (self ._password_bytes ())
70- return default_key_file
71+ return [], self ._password_bytes ()
7172
7273 def encrypt (
7374 self ,
7475 key_size : int = 512 ,
7576 hash_type : str = 'sha512' ,
7677 iter_time : int = 10000 ,
7778 key_file : Path | None = None
78- ) -> Path :
79+ ) -> Path | None :
7980 debug (f'Luks2 encrypting: { self .luks_dev_path } ' )
8081
81- key_file = self ._get_key_file (key_file )
82+ key_file_arg , passphrase = self ._get_passphrase_args (key_file )
8283
83- cryptsetup_args = shlex . join ( [
84+ cmd = [
8485 'cryptsetup' ,
8586 '--batch-mode' ,
8687 '--verbose' ,
@@ -89,19 +90,20 @@ def encrypt(
8990 '--hash' , hash_type ,
9091 '--key-size' , str (key_size ),
9192 '--iter-time' , str (iter_time ),
92- '--key-file' , str ( key_file ) ,
93+ * key_file_arg ,
9394 '--use-urandom' ,
94- 'luksFormat' , str (self .luks_dev_path ),
95- ])
95+ 'luksFormat' , str (self .luks_dev_path )
96+ ]
9697
97- debug (f'cryptsetup format: { cryptsetup_args } ' )
98+ debug (f'cryptsetup format: { shlex . join ( cmd ) } ' )
9899
99100 try :
100- result = SysCommand (cryptsetup_args ).decode ()
101- except SysCallError as err :
102- raise DiskError (f'Could not encrypt volume "{ self .luks_dev_path } ": { err } ' )
101+ result = run (cmd , input_data = passphrase )
102+ except CalledProcessError as err :
103+ output = err .stdout .decode ().rstrip ()
104+ raise DiskError (f'Could not encrypt volume "{ self .luks_dev_path } ": { output } ' )
103105
104- debug (f'cryptsetup luksFormat output: { result } ' )
106+ debug (f'cryptsetup luksFormat output: { result . stdout . decode (). rstrip () } ' )
105107
106108 self .key_file = key_file
107109
@@ -132,17 +134,19 @@ def unlock(self, key_file: Path | None = None) -> None:
132134 if not self .mapper_name :
133135 raise ValueError ('mapper name missing' )
134136
135- key_file = self ._get_key_file (key_file )
137+ key_file_arg , passphrase = self ._get_passphrase_args (key_file )
138+
139+ cmd = [
140+ 'cryptsetup' , 'open' ,
141+ str (self .luks_dev_path ),
142+ str (self .mapper_name ),
143+ * key_file_arg ,
144+ '--type' , 'luks2'
145+ ]
136146
137- result = SysCommand (
138- 'cryptsetup open '
139- f'{ self .luks_dev_path } '
140- f'{ self .mapper_name } '
141- f'--key-file { key_file } '
142- f'--type luks2'
143- ).decode ()
147+ result = run (cmd , input_data = passphrase )
144148
145- debug (f'cryptsetup open output: { result } ' )
149+ debug (f'cryptsetup open output: { result . stdout . decode (). rstrip () } ' )
146150
147151 if not self .mapper_dev or not self .mapper_dev .is_symlink ():
148152 raise DiskError (f'Failed to open luks2 device: { self .luks_dev_path } ' )
0 commit comments