-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserde.py
More file actions
89 lines (62 loc) · 2.02 KB
/
Copy pathserde.py
File metadata and controls
89 lines (62 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from __future__ import annotations
import base64
import json
from typing import NamedTuple
Base64Blob = str
def encode_blob_to_base64(data: bytes) -> str:
"""Encode binary data to a base64 string for JSON storage."""
return base64.b64encode(data).decode()
def decode_base64_blob(base64_blob: Base64Blob) -> bytes:
if not base64_blob:
return b""
parts = base64_blob.rsplit(":", 1)
return base64.b64decode(parts[-1])
class MySQLPendingWrite(NamedTuple):
"""The pending write tuple we receive from our DB query."""
task_id: str
channel: str
type_: str
blob: Base64Blob
idx: int
def deserialize_pending_writes(value: str) -> list[tuple[str, str, str, bytes]]:
if not value:
return []
values = (MySQLPendingWrite(*write) for write in json.loads(value))
return [
(db.task_id, db.channel, db.type_, decode_base64_blob(db.blob))
for db in sorted(values, key=lambda db: (db.task_id, db.idx))
]
class MySQLPendingSend(NamedTuple):
"""The pending send tuple we receive from our DB query."""
task_path: str
task_id: str
type_: str
blob: Base64Blob
idx: int
def deserialize_pending_sends(value: str) -> list[tuple[str, bytes]]:
if not value:
return []
values = (MySQLPendingSend(*send) for send in json.loads(value))
return [
(db.type_, decode_base64_blob(db.blob))
for db in sorted(values, key=lambda db: (db.task_path, db.task_id, db.idx))
]
class MySQLChannelValue(NamedTuple):
"""The channel value tuple we receive from our DB query."""
channel: str
type_: str
blob: Base64Blob | None
def deserialize_channel_values(
value: str,
) -> list[tuple[str, str, bytes | None]]:
if not value:
return []
values = (MySQLChannelValue(*channel_value) for channel_value in json.loads(value))
return [
(
db.channel,
db.type_,
decode_base64_blob(db.blob) if db.blob is not None else None,
)
for db in values
]