forked from pupil-labs/pi_preview
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsensor.py
More file actions
117 lines (98 loc) · 3.33 KB
/
sensor.py
File metadata and controls
117 lines (98 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import logging
from pyglui import ui
from pi_preview import GAZE_SENSOR_TYPE, Linked_Device
logger = logging.getLogger(__name__)
class GazeSensor:
get_data_timeout = 100 # ms
def __init__(self, network, linked_device):
self._ndsi_sensor = None
self.network = network
self.host_uuid = linked_device.uuid
self.host_name = linked_device.name
self.offset = [0, 0]
if self.is_linked:
self.activate()
@property
def is_linked(self):
return bool(self.host_uuid)
@property
def is_online(self):
return bool(self._ndsi_sensor)
@property
def status(self):
if not self.is_linked:
return "Not linked"
elif not self.is_online:
return "Not connected"
else:
return "Connected"
@property
def linked_device(self):
return Linked_Device(self.host_uuid, self.host_name)
def unlink(self):
self.deactivate()
self.host_uuid = None
self.network = None
def activate(self):
if not self.is_linked:
logger.error("Cannot activate unlinked sensor")
return
try:
sensor = next(
s
for s in self.network.sensors.values()
if s["sensor_type"] == GAZE_SENSOR_TYPE
and s["host_uuid"] == self.host_uuid
)
except StopIteration:
logger.debug("Host not found")
self.deactivate()
return
self.host_name = sensor["host_name"]
self._ndsi_sensor = self.network.sensor(
sensor["sensor_uuid"], callbacks=(self.on_notification,)
)
self._ndsi_sensor.set_control_value("streaming", True)
self._ndsi_sensor.refresh_controls()
logger.info("Linked {}".format(self._ndsi_sensor))
def deactivate(self):
if self.is_online:
self._ndsi_sensor.unlink()
self._ndsi_sensor = None
def poll_notifications(self):
if self.is_online:
while self._ndsi_sensor.has_notifications:
self._ndsi_sensor.handle_notification()
def on_notification(self, sensor, event):
if event["subject"] == "error":
logger.warning("Error {}".format(event["error_str"]))
if (
"control_id" in event
and event["control_id"] in self._ndsi_sensor.controls
):
logger.info(str(self._ndsi_sensor.controls[event["control_id"]]))
def fetch_data(self):
if self.is_online:
return [
self._make_gaze_pos(x, y, ts)
for (x, y, ts) in self._ndsi_sensor.fetch_data()
]
return []
@staticmethod
def _make_gaze_pos(x, y, ts, frame_size_x=1080, frame_size_y=1080):
return {
"topic": "gaze.pi",
"norm_pos": [x / frame_size_x, 1.0 - y / frame_size_y],
"timestamp": ts,
"confidence": 1.0,
}
def add_ui_elements(self, menu):
menu.append(
ui.Text_Input("status", self, label="Status", setter=lambda _: None)
)
if self.is_linked:
menu.append(
ui.Text_Input(
"host_name", self, label="Linked device", setter=lambda _: None
)
)