-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathlog_parser.py
More file actions
111 lines (91 loc) · 5.54 KB
/
log_parser.py
File metadata and controls
111 lines (91 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from datetime import datetime, timedelta, timezone
import re
from state_types import ProgressState, CompletionState
from utils import convert_size
class LogParseException(Exception):
pass
class LogParser:
line_re = re.compile(r'(?P<datetime>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) (?P<level>\S+) (?P<type>\S+) (?P<message>.*)')
revision_re = re.compile(r'Backup for .+ at revision (?P<revision>\d+) completed')
files_re = re.compile(r'Files\: (?P<files>\d+[\d.,]*\d*) total, (?P<size>\d+[\d.,]*\d*)(?P<size_unit>[TGMK]?) bytes; (?P<new_files>\d+[\d.,]*\d*) new, (?P<new_size>\d+[\d.,]*\d*)(?P<new_size_unit>[TGMK]?) bytes')
chunks_re = re.compile(r'All chunks\: (?P<chunks>\d+[\d.,]*\d*) total, (?P<size>\d+[\d.,]*\d*)(?P<size_unit>[TGMK]?) bytes; (?P<new_chunks>\d+[\d.,]*\d*) new, (?P<new_size>\d+[\d.,]*\d*)(?P<new_size_unit>[TGMK]?) bytes')
time_re = re.compile(r'Total running time\: (?:(?P<days>\d+) days? )?(?P<hours>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})')
progress_re = re.compile(r'(?:Uploaded|Skipped) chunk \d+ size \d+, (?P<speed>\d+[\d.,]*\d*)(?P<unit>[TGMK]?)B/s (?:(?P<days>\d+) days? )?(?P<hours>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2}) (?P<percent>\d+[\d.,]*\d*)%')
def __init__(self, update_handler):
self.update_handler = update_handler
self.completion_state = CompletionState()
self.last_publish_progress = datetime.fromtimestamp(0, timezone.utc).astimezone()
def parse_line(self, line):
match = self.line_re.match(line)
if not match:
return
ts = datetime.fromisoformat(match['datetime']).astimezone()
if match['level'] == 'WARN':
self.completion_state.warnings.append(line)
elif match['level'] == 'ERROR':
self.completion_state.errors.append(line)
elif match['type'] == 'BACKUP_START':
self.completion_state.time_started = ts
progress = ProgressState(0.0, timedelta(), timedelta(), 0.0)
self.update_handler.send_progress(progress)
elif match['type'] == 'BACKUP_END':
self.completion_state.time_finished = ts
progress = ProgressState(0.0, timedelta(), timedelta(), 0.0, False)
self.update_handler.send_progress(progress)
self._parse_backup_end(match['message'])
elif match['type'] == 'BACKUP_STATS':
self._parse_backup_stats(match['message'])
elif match['type'] == 'UPLOAD_PROGRESS':
self._parse_upload_progress(match['message'], ts)
def handle_return_code(self, return_code):
if return_code != 0:
self.completion_state.errors.append(f'Non-zero return code: {return_code}')
# In case end time was never set
if self.completion_state.time_finished == datetime.fromtimestamp(0, timezone.utc):
self.completion_state.time_finished = datetime.now(timezone.utc).astimezone()
self.update_handler.send_completion(self.completion_state)
# Redundant reset of progress. In case underlying command crashes without signalling end
progress = ProgressState(0.0, timedelta(), timedelta(), 0.0, False)
self.update_handler.send_progress(progress)
def _match(self, compiled_re, line):
match = compiled_re.match(line)
if not match:
error = f'Failed to parse "{line}"'
self.completion_state.errors.append(error)
raise LogParseException(error)
return match
def _parse_backup_end(self, line):
match = self._match(self.revision_re, line)
self.completion_state.revision = int(match['revision'])
def _parse_backup_stats(self, line):
if line.startswith('Files:'):
match = self._match(self.files_re, line)
self.completion_state.files = int(match['files'])
files_size = convert_size(match['size'], match['size_unit'], 'G')
self.completion_state.files_size = files_size
self.completion_state.new_files = int(match['new_files'])
new_files_size = convert_size(match['new_size'], match['new_size_unit'], 'G')
self.completion_state.new_files_size = new_files_size
elif line.startswith('All chunks:'):
match = self._match(self.chunks_re, line)
self.completion_state.chunks = int(match['chunks'])
chunks_size = convert_size(match['size'], match['size_unit'], 'G')
self.completion_state.chunks_size = chunks_size
self.completion_state.new_chunks = int(match['new_chunks'])
new_chunks_size = convert_size(match['new_size'], match['new_size_unit'], 'G')
self.completion_state.new_chunks_size = new_chunks_size
elif line.startswith('Total running time:'):
match = self._match(self.time_re, line)
time_elapsed = timedelta(days=int(match['days'] or 0), hours=int(match['hours']), minutes=int(match['min']), seconds=int(match['sec']))
self.completion_state.time_elapsed = time_elapsed
def _parse_upload_progress(self, line, ts):
match = self._match(self.progress_re, line)
progress = ProgressState(
match['percent'],
timedelta(days=int(match['days'] or 0), hours=int(match['hours']), minutes=int(match['min']), seconds=int(match['sec'])),
ts - self.completion_state.time_started,
convert_size(match['speed'], match['unit'], 'M')
)
if (ts - self.last_publish_progress).total_seconds() >= 1:
self.last_publish_progress = ts
self.update_handler.send_progress(progress)