-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathdaemon_log.py
More file actions
110 lines (83 loc) · 2.56 KB
/
daemon_log.py
File metadata and controls
110 lines (83 loc) · 2.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python3
import asyncio
import re
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Union
import aiofiles
SEM = asyncio.Semaphore(60)
LOGGER_PORT = 9071
class LineBuffer(bytearray):
# hack to deal with StreamReader not allowing a regex pattern
SEP = re.compile(b'(?:\r\n)|\r|\n')
def find(self, _, offset):
match = self.SEP.search(self, offset)
return match.start() if match else -1
class DummyLogger:
async def __aenter__(self):
await asyncio.sleep(0)
return self
async def __aexit__(self, *args):
await asyncio.sleep(0)
async def write(self, _):
await asyncio.sleep(0)
def decode(data: bytes) -> str:
return data.decode('latin-1')
def encode(data: str) -> bytes:
return data.encode('latin-1')
@asynccontextmanager
async def get_logger(log: Union[Path, None]):
if log is None:
logger = DummyLogger()
else:
logger = aiofiles.open(log, 'w+', encoding='utf-8')
async with logger as log:
yield log
@asynccontextmanager
async def open_connection(host: str, port: int):
while True:
try:
reader, writer = await asyncio.open_connection(host, port)
except OSError:
await asyncio.sleep(1)
continue
try:
yield reader, writer
break
finally:
await writer.drain()
writer.close()
async def log_task(reader: asyncio.StreamReader, logger: Union[DummyLogger, aiofiles.threadpool.AsyncTextIOWrapper]):
line = b''
while True:
if reader.at_eof():
if '\r' in line:
await logger.write(line.replace('\r', '\n'))
break
try:
line = await reader.readline()
except OSError:
await asyncio.sleep(1)
continue
line = line.decode('latin-1')
if '\r' not in line:
await logger.write(line)
print(line, end='')
async def logger_client(host: str):
async with get_logger(Path('daemon_log.txt')) as logger:
while True:
async with SEM:
async with open_connection(host, LOGGER_PORT) as (reader, _):
reader._buffer = LineBuffer(reader._buffer)
await log_task(reader, logger)
def main():
if len(sys.argv) != 2:
print(f'usage: {__file__} ps5ip')
sys.exit()
try:
asyncio.run(logger_client(sys.argv[1]))
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()