-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathsend_elf.py
More file actions
375 lines (316 loc) · 11 KB
/
send_elf.py
File metadata and controls
375 lines (316 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
#!/usr/bin/env python3
import argparse
import asyncio
import re
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Union
import aiofiles
SEM = asyncio.Semaphore(60)
ORIGINAL_ELF_PORT = 9020
LOGGER_PORT = 9021
KLOGGER_PORT = 9081
COMMAND_PORT = 9028
ELF_PORT = 9027
MAX_PROCESS_NAME_LENGTH = 32
DAEMON_TYPE = b'\x01'
GAME_TYPE = b'\x02'
RESPONSE_OK = b'\x00'
RESPONSE_ERROR = b'\xff'
class LineBuffer(bytearray):
# hack to deal with StreamReader not allowing a regex pattern
SEP = re.compile(b'(?:\r\n)|\r|\n')
def find(self, _, offset):
match = self.SEP.search(self, offset)
return match.start() if match else -1
class DummyLogger:
async def __aenter__(self):
await asyncio.sleep(0)
return self
async def __aexit__(self, *args):
await asyncio.sleep(0)
async def write(self, _):
await asyncio.sleep(0)
def decode(data: bytes) -> str:
return data.decode('latin-1')
def encode(data: str) -> bytes:
return data.encode('latin-1')
def get_logger(log: Union[Path, None]):
if log is None:
return DummyLogger()
return aiofiles.open(log, 'w+', encoding='utf-8')
class ParsedArgs:
def __init__(self):
self.ip: str = None
self._elf: Path = None
self._spawner = Path('bin/spawner.elf')
self._log = Path('log.txt')
self._klog = Path('klog.txt')
self.silent = False
self.game = False
self.name: str = None
self._logger: aiofiles.threadpool.AiofilesContextManager = None
self._klogger: aiofiles.threadpool.AiofilesContextManager = None
self.logger: DummyLogger | aiofiles.threadpool.AsyncTextIOWrapper = None
self.klogger: DummyLogger | aiofiles.threadpool.AsyncTextIOWrapper = None
@property
def host(self) -> str:
return self.ip
@property
def elf(self) -> Union[Path, None]:
return self._elf
@elf.setter
def elf(self, elf: Union[str, None]):
if elf:
self._elf = Path(elf)
else:
self._elf = None
@property
def spawner(self) -> Path:
return self._spawner
@spawner.setter
def spawner(self, spawner: str):
self._spawner = Path(spawner)
@property
def log(self) -> Path:
return self._log
@log.setter
def log(self, log: str):
self._log = Path(log)
@property
def klog(self) -> Path:
return self._klog
@klog.setter
def klog(self, klog: str):
self._klog = Path(klog)
@property
def nolog(self):
return None
@nolog.setter
def nolog(self, value: bool):
if value:
self._log = None
self._klog = None
async def __aenter__(self):
if self.nolog:
await asyncio.sleep(0)
else:
self._logger = get_logger(self.log)
self.logger = await self._logger.__aenter__()
self._klogger = get_logger(self.klog)
self.klogger = await self._klogger.__aenter__()
return self
async def __aexit__(self, *args):
try:
if self._logger is not None:
await self._logger.__aexit__(*args)
self._logger = None
self.logger = None
except:
#suppress
pass
if self._klogger is not None:
await self._klogger.__aexit__(*args)
self._klogger = None
self.klogger = None
#ensure we have awaited at least once
await asyncio.sleep(0)
@asynccontextmanager
async def open_connection(host: str, port: int, wait=True):
reader = None
writer = None
while True:
try:
reader, writer = await asyncio.open_connection(host, port)
except OSError:
if wait:
await asyncio.sleep(1)
continue
try:
yield reader, writer
break
finally:
if writer is not None:
await writer.drain()
writer.close()
async def send_elf(args: ParsedArgs):
await args.logger.write('sending_elf\n')
nameLength = len(args.name)
if nameLength > MAX_PROCESS_NAME_LENGTH:
args.name = args.name[:MAX_PROCESS_NAME_LENGTH]
print(f'process name was truncated to {args.name}')
elif nameLength < MAX_PROCESS_NAME_LENGTH:
args.name += '\0'*(MAX_PROCESS_NAME_LENGTH - nameLength)
async with open_connection(args.host, ELF_PORT) as (reader, writer):
data = args.elf.read_bytes()
if args.game:
writer.write(GAME_TYPE)
else:
writer.write(DAEMON_TYPE)
writer.write(encode(args.name))
writer.write(len(data).to_bytes(8, byteorder='little'))
writer.write(data)
writer.write_eof()
await writer.drain()
#response = await reader.readexactly(1)
'''
if response == RESPONSE_OK:
await args.logger.write('elf sent successfully\n')
return True
if response == RESPONSE_ERROR:
await args.logger.write(f'failed to send {args.elf}\n')
line = await reader.readline()
await args.logger.write(decode(line)+'\n')
else:
await args.logger.write('unexpected response ' + response.hex()+'\n')
'''
await log_task(reader, args.logger, False)
return False
async def log_task(reader: asyncio.StreamReader, logger: Union[DummyLogger, aiofiles.threadpool.AsyncTextIOWrapper], silent = False):
line = b''
while True:
if reader.at_eof():
if '\r' in line:
await logger.write(line.replace('\r', '\n'))
break
try:
line = await reader.readline()
except OSError:
await asyncio.sleep(1)
continue
line = line.decode('latin-1')
if 'a looped back NS message is' in line:
# STFU
continue
if silent:
continue
if '\r' not in line:
await logger.write(line)
print(line, end='')
async def has_homebrew_daemon(host: str) -> bool:
# checks if the homebrew daemon is already running on the PS5
async with SEM:
try:
async with open_connection(host, COMMAND_PORT, wait=False) as (reader, writer):
if writer is None:
return False
writer.write(b'\x00')
response = await reader.read(1)
return response == b'\x00'
except OSError:
return False
async def logger_client(args: ParsedArgs):
async with SEM:
async with open_connection(args.host, LOGGER_PORT) as (reader, writer):
reader._buffer = LineBuffer(reader._buffer)
writer.write(args.spawner.read_bytes())
writer.write_eof()
await writer.drain()
await log_task(reader, args.logger, args.silent)
print('logger client finished')
async def send_spawner(args: ParsedArgs):
async with SEM:
async with open_connection(args.host, ORIGINAL_ELF_PORT, wait=False) as (reader, writer):
if writer is None:
return
reader._buffer = LineBuffer(reader._buffer)
writer.write(args.spawner.read_bytes())
writer.write_eof()
await writer.drain()
await logger_client(args)
async def send_daemon(args: ParsedArgs):
async with SEM:
async with open_connection(args.host, ELF_PORT) as (reader, writer):
reader._buffer = LineBuffer(reader._buffer)
data = Path('bin/daemon.elf').read_bytes()
writer.write(len(data).to_bytes(8, byteorder='little'))
writer.write(data)
writer.write_eof()
await writer.drain()
async def klog_client(args: ParsedArgs):
async with SEM:
async with open_connection(args.host, KLOGGER_PORT) as (reader, _):
reader._buffer = LineBuffer(reader._buffer)
await log_task(reader, args.klogger, args.silent)
async def run_loggers(args: ParsedArgs):
async with args:
has_daemon = await has_homebrew_daemon(args.host)
print(f'has_daemon: {has_daemon}')
if not has_daemon:
print(f'start send_spawner(args)')
await send_spawner(args)
print(f'start send_daemon(args)')
await send_daemon(args)
print('finished')
#spawner = asyncio.create_task(send_daemon(args))
#logger = asyncio.create_task(logger_client(args))
#await asyncio.wait((spawner, logger), return_when=asyncio.ALL_COMPLETED)
if args.elf:
if args.name is None:
args.name = args.elf.stem
sender = asyncio.create_task(send_elf(args))
logger = asyncio.create_task(klog_client(args))
await asyncio.wait((sender, logger), return_when=asyncio.ALL_COMPLETED)
else:
await klog_client(args)
#klogger = asyncio.create_task(klog_client(args))
#sender = asyncio.create_task(send_elf(args))
#await asyncio.wait((klogger, sender), return_when=asyncio.ALL_COMPLETED)
def main():
parser = argparse.ArgumentParser(
description='Helper script for sending the spawner, an elf to load into spawned process and for logging',
)
parser.add_argument('ip', help='PS5 ip address')
#parser.add_argument('elf', help='Path to the elf to load into the spawned process')
parser.add_argument(
'--spawner',
default='bin/spawner.elf',
help='Path to the elf used to spawn the new process. (default: bin/spawner.elf)'
)
parser.add_argument(
'--log',
default='log.txt',
help='Path to output log file if enabled. (default: log.txt)'
)
parser.add_argument(
'--klog',
default='klog.txt',
help='Path to output log file if enabled. (default: log.txt)'
)
parser.add_argument(
'--nolog',
default=False,
action='store_true',
help='Switch to disable logging to an output file. (default False)'
)
parser.add_argument(
'--silent',
default=False,
action='store_true',
help='Switch to disable all logging. (default: False)'
)
parser.add_argument(
'--game',
default=False,
action='store_true',
help='Set process type to game. (default: False)'
)
parser.add_argument(
'--name',
default=None,
help='The name of the new process. (default: the elf file name without the extension)'
)
args: ParsedArgs = parser.parse_args(namespace=ParsedArgs())
try:
if not args.spawner.exists():
print(f'{args.spawner} does not exist')
return
if args.elf and not args.elf.exists():
print(f'{args.elf} does not exist')
return
print(f'{__file__}: start')
asyncio.run(run_loggers(args))
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()