From aeb2d684c2a95e8f56f3ed6423d788908a1c5e53 Mon Sep 17 00:00:00 2001 From: Brian Beck Date: Wed, 25 Mar 2026 14:39:45 -0700 Subject: [PATCH 1/2] Run lint/format across all files * Update the makefile to run them against all python files. * Fix syntax error in requirements_dev.txt (trailing period). * Update flake8 to v5. * Fix broken imports in unit tests * Run `make format` to get all files into a lint-clean state in preparation for future work. * Fix errors called out by `make lint` Fixes #59 Tested: ``` make setup python -m unittest make format make lint ``` --- software/Makefile | 2 +- software/authbox/__init__.py | 12 +- software/authbox/api.py | 4 +- .../authbox/badgereader_hid_keystroking.py | 5 +- software/authbox/badgereader_wiegand_gpio.py | 5 +- software/authbox/fake_gpio_for_testing.py | 5 +- software/authbox/gpio_button.py | 28 +- software/authbox/gpio_buzzer.py | 3 +- software/authbox/gpio_relay.py | 8 +- .../authbox/tests/setup_mock_pin_factory.py | 16 +- software/authbox/tests/test_api.py | 30 ++- .../tests/test_badgereader_hid_keystroking.py | 5 +- .../tests/test_badgereader_wiegand_gpio.py | 9 +- software/authbox/tests/test_config.py | 4 +- software/authbox/tests/test_gpio_button.py | 25 +- software/authbox/tests/test_gpio_buzzer.py | 14 +- software/authbox/tests/test_gpio_relay.py | 9 +- software/authbox/tests/test_timer.py | 4 +- software/lockbox.py | 101 +++---- software/qa.py | 90 ++++--- software/requirements-dev.txt | 4 +- software/test_qa.py | 4 +- software/test_two_button.py | 85 +++--- software/two_button.py | 250 ++++++++++-------- 24 files changed, 383 insertions(+), 339 deletions(-) diff --git a/software/Makefile b/software/Makefile index a61cab9..38f2898 100644 --- a/software/Makefile +++ b/software/Makefile @@ -1,5 +1,5 @@ .ONESHELL: -SOURCES=authbox setup.py +SOURCES=authbox *.py ifeq ($(shell grep ^ID= /etc/os-release | cut -d = -f 2), raspbian) RASPBIAN=1 diff --git a/software/authbox/__init__.py b/software/authbox/__init__.py index 9cce9d4..db5ea8b 100755 --- a/software/authbox/__init__.py +++ b/software/authbox/__init__.py @@ -19,9 +19,11 @@ __version__ = "2.0.0" try: - from gpiozero import Device - del Device + from gpiozero import Device + + del Device except ModuleNotFoundError: - print("ERROR: 'gpiozero' must be installed") - import sys - sys.exit(1) + print("ERROR: 'gpiozero' must be installed") + import sys + + sys.exit(1) diff --git a/software/authbox/api.py b/software/authbox/api.py index 3308735..4fb7d2e 100755 --- a/software/authbox/api.py +++ b/software/authbox/api.py @@ -130,9 +130,7 @@ def run(self): class BasePinThread(BaseDerivedThread): - def __init__( - self, event_queue, config_name, input_pin, output_pin - ): + def __init__(self, event_queue, config_name, input_pin, output_pin): super(BasePinThread, self).__init__(event_queue, config_name) self.input_pin = input_pin diff --git a/software/authbox/badgereader_hid_keystroking.py b/software/authbox/badgereader_hid_keystroking.py index 2f3e549..8d6ded1 100755 --- a/software/authbox/badgereader_hid_keystroking.py +++ b/software/authbox/badgereader_hid_keystroking.py @@ -21,6 +21,7 @@ from authbox.api import BaseDerivedThread, NoMatchingDevice + class HIDKeystrokingReader(BaseDerivedThread): """Badge reader hardware abstraction. @@ -178,9 +179,9 @@ def __init__( def setUp(self): try: - import evdev + import evdev # noqa: F401 except ModuleNotFoundError: - self.fail("evdev not available") + self.fail("evdev not available") def get_scanner_device(self): """Finds connected device matching device_name. diff --git a/software/authbox/badgereader_wiegand_gpio.py b/software/authbox/badgereader_wiegand_gpio.py index 3fadf0b..9dc9c32 100755 --- a/software/authbox/badgereader_wiegand_gpio.py +++ b/software/authbox/badgereader_wiegand_gpio.py @@ -73,8 +73,8 @@ def __init__( self.timeout_in_seconds = float(timeout_in_ms) / 1000 if self._on_scan: - self.d0_input_device.when_activated = self.decode; - self.d1_input_device.when_activated = self.decode; + self.d0_input_device.when_activated = self.decode + self.d1_input_device.when_activated = self.decode def decode(self, channel): bit = "0" if channel == self.d0_input_device else "1" @@ -119,4 +119,3 @@ def close(self): self.d0_input_device.close() if self.d1_input_device: self.d1_input_device.close() - diff --git a/software/authbox/fake_gpio_for_testing.py b/software/authbox/fake_gpio_for_testing.py index c249812..0a7ea29 100644 --- a/software/authbox/fake_gpio_for_testing.py +++ b/software/authbox/fake_gpio_for_testing.py @@ -16,19 +16,20 @@ from __future__ import print_function -import time +import time # noqa: F401 def _log_match(a, b): return abs(a[0] - b[0]) < 0.1 and a[1] == b[1] and a[2] == b[2] + class FakeTime(object): """Fake for the module 'time' so tests run faster.""" def __init__(self): self.t = 0 - def time(self): + def time(self): # noqa: F811 return self.t def sleep(self, x): diff --git a/software/authbox/gpio_button.py b/software/authbox/gpio_button.py index b52c0c2..b0f03ea 100755 --- a/software/authbox/gpio_button.py +++ b/software/authbox/gpio_button.py @@ -15,10 +15,12 @@ """Abstraction for blinky buttons. """ +import time + +import gpiozero + from authbox.api import BasePinThread from authbox.compat import queue -import gpiozero -import time class Button(BasePinThread): @@ -55,26 +57,26 @@ def __init__( self.steady_state = False self.gpio_led = gpiozero.LED(pin="BOARD" + str(self.output_pin)) button_pin = "BOARD" + str(self.input_pin) - self.gpio_button = gpiozero.Button(button_pin, bounce_time = 0.15) + self.gpio_button = gpiozero.Button(button_pin, bounce_time=0.15) if self._on_down: self.gpio_button.when_pressed = self._callback def _callback(self): """Wrapper to queue events instead of calling them directly.""" # If we have a callback registered, debounce the switch press - if (self._on_down): + if self._on_down: # This is a de-bounce filter to prevent spurious signals from triggering the logic # Looks for 5 continuous active states (each separated by 10ms) - maxcount = 15 # Look for 150ms maximum - lowcount = 0 # Count the number of active states seen - while ((maxcount > 0) and (lowcount <= 4)): - time.sleep(0.01) # 10ms delay between each cycle - maxcount = maxcount - 1 # Decrement remaining cycles - if (self.gpio_button.is_pressed): - lowcount = lowcount + 1 # One more low cycle detected + maxcount = 15 # Look for 150ms maximum + lowcount = 0 # Count the number of active states seen + while (maxcount > 0) and (lowcount <= 4): + time.sleep(0.01) # 10ms delay between each cycle + maxcount = maxcount - 1 # Decrement remaining cycles + if self.gpio_button.is_pressed: + lowcount = lowcount + 1 # One more low cycle detected else: - lowcount = 0 # Not continuously low, reset - if (lowcount > 4): + lowcount = 0 # Not continuously low, reset + if lowcount > 4: self.event_queue.put((self._on_down, self)) def run_inner(self): diff --git a/software/authbox/gpio_buzzer.py b/software/authbox/gpio_buzzer.py index 0056cd0..5e42dc4 100644 --- a/software/authbox/gpio_buzzer.py +++ b/software/authbox/gpio_buzzer.py @@ -16,9 +16,10 @@ """ from __future__ import print_function -import gpiozero import time +import gpiozero + from authbox.api import BasePinThread from authbox.compat import queue diff --git a/software/authbox/gpio_relay.py b/software/authbox/gpio_relay.py index 4c3d976..c418ca2 100755 --- a/software/authbox/gpio_relay.py +++ b/software/authbox/gpio_relay.py @@ -38,11 +38,11 @@ class Relay(BasePinThread): """ def __init__(self, event_queue, config_name, output_type, output_pin): - super(Relay, self).__init__( - event_queue, config_name, None, int(output_pin) - ) + super(Relay, self).__init__(event_queue, config_name, None, int(output_pin)) self.output_on_val = types[output_type] - self.gpio_relay = gpiozero.DigitalOutputDevice("BOARD" + str(output_pin), initial_value= not types[output_type]) + self.gpio_relay = gpiozero.DigitalOutputDevice( + "BOARD" + str(output_pin), initial_value=not types[output_type] + ) # TODO: Push this initial setup into BasePinThread, to avoid a momentary glitch self.off() diff --git a/software/authbox/tests/setup_mock_pin_factory.py b/software/authbox/tests/setup_mock_pin_factory.py index 9b2afb5..c84c1e3 100755 --- a/software/authbox/tests/setup_mock_pin_factory.py +++ b/software/authbox/tests/setup_mock_pin_factory.py @@ -1,10 +1,12 @@ try: - from gpiozero import Device - if not Device.pin_factory: - from gpiozero.pins.mock import MockFactory - Device.pin_factory = MockFactory() + from gpiozero import Device + + if not Device.pin_factory: + from gpiozero.pins.mock import MockFactory + + Device.pin_factory = MockFactory() except ModuleNotFoundError: - print("ERROR: 'gpiozero' must be installed") - import sys - sys.exit(1) + print("ERROR: 'gpiozero' must be installed") + import sys + sys.exit(1) diff --git a/software/authbox/tests/test_api.py b/software/authbox/tests/test_api.py index be81914..f58ec5e 100755 --- a/software/authbox/tests/test_api.py +++ b/software/authbox/tests/test_api.py @@ -14,17 +14,18 @@ """Tests for authbox.api""" -import gpiozero -import gpiozero.pins.mock import tempfile import unittest -import setup_mock_pin_factory +import gpiozero +import gpiozero.pins.mock # noqa: F401 import authbox.api import authbox.config import authbox.gpio_button +from . import setup_mock_pin_factory # noqa: F401 + SAMPLE_CONFIG = b""" [pins] button_a = Button:11:38 @@ -41,17 +42,18 @@ def test_no_duplicate_names(self): self.assertEqual(len(short_names), len(authbox.api.CLASS_REGISTRY)) def test_all_names_importable(self): - try: - import evdev - del evdev - except ModuleNotFoundError: - self.fail("Test requires evdev, but evdev is not available") - - for c in authbox.api.CLASS_REGISTRY: - cls = authbox.api._import(c) - assert issubclass( - cls, (authbox.api.BasePinThread, authbox.api.BaseDerivedThread) - ), (c, cls, cls.__bases__) + try: + import evdev + + del evdev + except ModuleNotFoundError: + self.fail("Test requires evdev, but evdev is not available") + + for c in authbox.api.CLASS_REGISTRY: + cls = authbox.api._import(c) + assert issubclass( + cls, (authbox.api.BasePinThread, authbox.api.BaseDerivedThread) + ), (c, cls, cls.__bases__) class DispatcherTest(unittest.TestCase): diff --git a/software/authbox/tests/test_badgereader_hid_keystroking.py b/software/authbox/tests/test_badgereader_hid_keystroking.py index bcdf01d..e704291 100755 --- a/software/authbox/tests/test_badgereader_hid_keystroking.py +++ b/software/authbox/tests/test_badgereader_hid_keystroking.py @@ -17,16 +17,15 @@ import unittest import authbox.badgereader_hid_keystroking - from authbox.compat import queue class BadgereaderTest(unittest.TestCase): def setUp(self): try: - from authbox import fake_evdev_device_for_testing + from authbox import fake_evdev_device_for_testing except ModuleNotFoundError: - self.fail("Test requires evdev, but evdev is not available") + self.fail("Test requires evdev, but evdev is not available") authbox.badgereader_hid_keystroking.evdev.list_devices = ( fake_evdev_device_for_testing.list_devices diff --git a/software/authbox/tests/test_badgereader_wiegand_gpio.py b/software/authbox/tests/test_badgereader_wiegand_gpio.py index 37e0fed..1525d6d 100755 --- a/software/authbox/tests/test_badgereader_wiegand_gpio.py +++ b/software/authbox/tests/test_badgereader_wiegand_gpio.py @@ -14,16 +14,15 @@ """Tests for authbox.badgereader_wiegand_gpio""" -import gpiozero import threading import time import unittest -import setup_mock_pin_factory - import authbox.badgereader_wiegand_gpio from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + class BadgereaderWiegandGPIOTest(unittest.TestCase): def setUp(self): @@ -35,7 +34,7 @@ def setUp(self): "40", on_scan=self.on_scan, ) - + def tearDown(self): self.b.close() @@ -70,7 +69,7 @@ def add_bits_later(): def test_limited_queue_size(self): self.b.d1_input_device.pin.drive_low() for i in range(500): - # Send a 0 + # Send a 0 self.b.d0_input_device.pin.drive_high() self.b.d0_input_device.pin.drive_low() self.b.run_inner() diff --git a/software/authbox/tests/test_config.py b/software/authbox/tests/test_config.py index 427cfe1..f6c0ce7 100755 --- a/software/authbox/tests/test_config.py +++ b/software/authbox/tests/test_config.py @@ -16,10 +16,10 @@ import unittest -import setup_mock_pin_factory - import authbox.config +from . import setup_mock_pin_factory # noqa: F401 + class ConfigTest(unittest.TestCase): def test_parse_time(self): diff --git a/software/authbox/tests/test_gpio_button.py b/software/authbox/tests/test_gpio_button.py index 3b9cd17..117c332 100755 --- a/software/authbox/tests/test_gpio_button.py +++ b/software/authbox/tests/test_gpio_button.py @@ -17,12 +17,13 @@ import unittest from functools import partial -import setup_mock_pin_factory - import authbox.gpio_button from authbox import fake_gpio_for_testing from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + + class TestButton(authbox.gpio_button.Button): def drive_high(self): self.gpio_button.pin.drive_high() @@ -32,7 +33,7 @@ def clear_states(self): self.gpio_led.pin.clear_states() def assert_button_states(self, expected_states): - assert len(self.gpio_button.pin.states) == len(expected_states) + assert len(self.gpio_button.pin.states) == len(expected_states) self.gpio_button.pin.assert_states(expected_states) def assert_led_states(self, expected_states): @@ -43,6 +44,7 @@ def close(self): self.gpio_button.close() self.gpio_led.close() + class ImpatientQueue(queue.Queue): def __init__(self, fake_time): queue.Queue.__init__(self) @@ -55,6 +57,7 @@ def get(self, block, timeout): raise queue.Empty return queue.Queue.get(self, block=block, timeout=timeout) + class BlinkTest(unittest.TestCase): def setUp(self): self.time = fake_gpio_for_testing.FakeTime() @@ -71,15 +74,15 @@ def setUp(self): self.b.clear_states() def tearDown(self): - self.b.close() + self.b.close() def on_down(self): pass def test_on(self): # Verify pins are correctly configured - self.assertEqual('input', self.b.gpio_button.pin._function) - self.assertEqual('output', self.b.gpio_led.pin._function) + self.assertEqual("input", self.b.gpio_button.pin._function) + self.assertEqual("output", self.b.gpio_led.pin._function) self.b.drive_high() self.b.run_inner() @@ -94,7 +97,9 @@ def test_blinking_thread(self): self.b.run_inner() # Number of on/off transitions corresponds to number of run_inner() calls - self.b.assert_led_states([False, True, False, True, False, True, False, True, False]) + self.b.assert_led_states( + [False, True, False, True, False, True, False, True, False] + ) def test_finite_blink_count_from_off(self): self.b.off() @@ -159,10 +164,10 @@ def test_overlapping_blink_counts(self): # won't be user visible because it's the same "on" state as the initial # steady state to which we return. self.b.assert_led_states( - [ - False, # Startup off state + [ + False, # Startup off state True, # Initial steady state - False, # First blink cycle starts + False, # First blink cycle starts True, # Second blink cycle starts False, True, # Second blink cycle ends after 4 cycles diff --git a/software/authbox/tests/test_gpio_buzzer.py b/software/authbox/tests/test_gpio_buzzer.py index 764c567..08d5d92 100755 --- a/software/authbox/tests/test_gpio_buzzer.py +++ b/software/authbox/tests/test_gpio_buzzer.py @@ -16,12 +16,13 @@ import unittest -import setup_mock_pin_factory - import authbox.gpio_buzzer from authbox import fake_gpio_for_testing from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + + class TestBuzzer(authbox.gpio_buzzer.Buzzer): def assert_states(self, expected_states): assert len(self.gpio_buzzer.pin.states) == len(expected_states) @@ -31,7 +32,8 @@ def clear_states(self): self.gpio_buzzer.pin.clear_states() def close(self): - self.gpio_buzzer.close() + self.gpio_buzzer.close() + class BuzzerTest(unittest.TestCase): def setUp(self): @@ -41,9 +43,9 @@ def setUp(self): self.q = queue.Queue() self.b = TestBuzzer(self.q, "b", "15") self.b.clear_states() - + def tearDown(self): - self.b.close() + self.b.close() def test_on(self): self.time.sleep(2) @@ -59,7 +61,7 @@ def test_off(self): self.assertRaises(queue.Empty, self.b.run_inner, False) self.b.assert_states([False]) - def test_beep(self): + def test_beep(self): self.b.beep() self.b.run_inner(False) self.b.on() diff --git a/software/authbox/tests/test_gpio_relay.py b/software/authbox/tests/test_gpio_relay.py index de44ca6..3a6c750 100755 --- a/software/authbox/tests/test_gpio_relay.py +++ b/software/authbox/tests/test_gpio_relay.py @@ -16,18 +16,21 @@ import unittest -import setup_mock_pin_factory - import authbox.gpio_relay from authbox import fake_gpio_for_testing from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + + class TestRelay(authbox.gpio_relay.Relay): def assert_states(self, expected_states): assert len(self.gpio_relay.pin.states) == len(expected_states) self.gpio_relay.pin.assert_states(expected_states) + def clear_states(self): - self.gpio_relay.pin.clear_states() + self.gpio_relay.pin.clear_states() + class RelayTest(unittest.TestCase): def setUp(self): diff --git a/software/authbox/tests/test_timer.py b/software/authbox/tests/test_timer.py index a67e068..47a5a28 100755 --- a/software/authbox/tests/test_timer.py +++ b/software/authbox/tests/test_timer.py @@ -16,11 +16,11 @@ import unittest -import setup_mock_pin_factory - import authbox.timer from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + class TimerTest(unittest.TestCase): def setUp(self): diff --git a/software/lockbox.py b/software/lockbox.py index ff53c05..a7256c1 100644 --- a/software/lockbox.py +++ b/software/lockbox.py @@ -19,66 +19,69 @@ """ from __future__ import print_function -import atexit import os -import sys -import subprocess import shlex +import subprocess +import sys from authbox.api import BaseDispatcher from authbox.config import Config from authbox.timer import Timer -DEVNULL = open('/dev/null', 'r+') - -class Dispatcher(BaseDispatcher): - def __init__(self, config): - super(Dispatcher, self).__init__(config) - - self.load_config_object('badge_reader', on_scan=self.badge_scan) - self.load_config_object('output_relay') +DEVNULL = open("/dev/null", "r+") - self.disable_timer = Timer(self.event_queue, 'disable_timer', self.disable) - # Otherwise, start them manually! - self.threads.extend([self.disable_timer]) +class Dispatcher(BaseDispatcher): + def __init__(self, config): + super(Dispatcher, self).__init__(config) + + self.load_config_object("badge_reader", on_scan=self.badge_scan) + self.load_config_object("output_relay") + + self.disable_timer = Timer(self.event_queue, "disable_timer", self.disable) + # Otherwise, start them manually! + self.threads.extend([self.disable_timer]) + + def _get_command_line(self, section, key, format_args): + """Constructs a command line, safely. + + The value can contain {key}, {}, and {5} style interpolation: + - {key} will be resolved in the config.get; those are considered safe and + spaces will separate args. + - {} works on each arg independently (probably not what you want). + - {5} works fine. + """ + value = self.config.get(section, key) + pieces = shlex.split(value) + return [p.format(*format_args) for p in pieces] + + def badge_scan(self, badge_id): + # Malicious badge "numbers" that contain spaces require this extra work. + command = self._get_command_line("auth", "command", [badge_id]) + # TODO timeout + # TODO test with missing command + rc = subprocess.call(command) + + if rc == 0: + self.disable_timer.cancel() + self.output_relay.on() + self.disable_timer.set( + self.config.get_int_seconds("auth", "duration", "1s") + ) + + def disable(self, source=None): + self.output_relay.off() - def _get_command_line(self, section, key, format_args): - """Constructs a command line, safely. - - The value can contain {key}, {}, and {5} style interpolation: - - {key} will be resolved in the config.get; those are considered safe and - spaces will separate args. - - {} works on each arg independently (probably not what you want). - - {5} works fine. - """ - value = self.config.get(section, key) - pieces = shlex.split(value) - return [p.format(*format_args) for p in pieces] - - def badge_scan(self, badge_id): - # Malicious badge "numbers" that contain spaces require this extra work. - command = self._get_command_line('auth', 'command', [badge_id]) - # TODO timeout - # TODO test with missing command - rc = subprocess.call(command) - - if rc == 0: - self.disable_timer.cancel() - self.output_relay.on() - self.disable_timer.set(self.config.get_int_seconds('auth', 'duration', '1s')) - - def disable(self, source=None): - self.output_relay.off() def main(args): - if not args: - root = '~' - else: - root = args[0] + if not args: + root = "~" + else: + root = args[0] + + config = Config(os.path.join(root, ".authboxrc")) + Dispatcher(config).run_loop() - config = Config(os.path.join(root, '.authboxrc')) - Dispatcher(config).run_loop() -if __name__ == '__main__': - main(sys.argv[1:]) +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/software/qa.py b/software/qa.py index 9c5371f..cb47fd5 100644 --- a/software/qa.py +++ b/software/qa.py @@ -26,54 +26,56 @@ from authbox.config import Config from authbox.timer import Timer -DEVNULL = open('/dev/null', 'r+') +DEVNULL = open("/dev/null", "r+") + class Dispatcher(BaseDispatcher): - def __init__(self, config): - super(Dispatcher, self).__init__(config) - - for i in range(1, 6+1): - self.load_config_object('j%d' % i, on_down=self.on_button_down) - - self.load_config_object('buzzer') - self.load_config_object('relays') - # This may be a MultiProxy, which makes this easier. - self.relays.on() - self.relay_value = True - - # Run fans for 10 seconds on startup - self.relay_timer = Timer(self.event_queue, 'relay_timer', self.toggle_relays) - self.relay_toggle_interval = 10 - self.relay_timer.set(self.relay_toggle_interval) - self.threads.extend([self.relay_timer]) - - def on_button_down(self, source): - print("Button down", source) - self.buzzer.beep() - - source.on() - time.sleep(0.3) - source.off() - - def toggle_relays(self, source): - print("Toggle relay", self.relay_value) - if self.relay_value: - self.relays.off() - self.relay_value = False - else: - self.relays.on() - self.relay_value = True - self.relay_timer.set(self.relay_toggle_interval) + def __init__(self, config): + super(Dispatcher, self).__init__(config) + + for i in range(1, 6 + 1): + self.load_config_object("j%d" % i, on_down=self.on_button_down) + + self.load_config_object("buzzer") + self.load_config_object("relays") + # This may be a MultiProxy, which makes this easier. + self.relays.on() + self.relay_value = True + + # Run fans for 10 seconds on startup + self.relay_timer = Timer(self.event_queue, "relay_timer", self.toggle_relays) + self.relay_toggle_interval = 10 + self.relay_timer.set(self.relay_toggle_interval) + self.threads.extend([self.relay_timer]) + + def on_button_down(self, source): + print("Button down", source) + self.buzzer.beep() + + source.on() + time.sleep(0.3) + source.off() + + def toggle_relays(self, source): + print("Toggle relay", self.relay_value) + if self.relay_value: + self.relays.off() + self.relay_value = False + else: + self.relays.on() + self.relay_value = True + self.relay_timer.set(self.relay_toggle_interval) def main(args): - if not args: - config_filename = 'qa.ini' - else: - config_filename = args[0] + if not args: + config_filename = "qa.ini" + else: + config_filename = args[0] + + config = Config(config_filename) + Dispatcher(config).run_loop() - config = Config(config_filename) - Dispatcher(config).run_loop() -if __name__ == '__main__': - main(sys.argv[1:]) +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/software/requirements-dev.txt b/software/requirements-dev.txt index 8fb0edf..942a649 100755 --- a/software/requirements-dev.txt +++ b/software/requirements-dev.txt @@ -1,8 +1,8 @@ gpiozero==2.0 -nose==1.3. +nose==1.3 coverage==7.2.1 evdev==1.6.1 isort==5.11.5 black==23.1.0 ; python_version >= '3.6' tox==3.28.0 -flake8==3.9.2 +flake8==5.0.4 diff --git a/software/test_qa.py b/software/test_qa.py index 5ea3216..7ddca1e 100644 --- a/software/test_qa.py +++ b/software/test_qa.py @@ -14,8 +14,8 @@ import unittest + class QATest(unittest.TestCase): # TODO def test_import(self): - import qa - + import qa # noqa: F401 diff --git a/software/test_two_button.py b/software/test_two_button.py index e6594b0..ff130c5 100755 --- a/software/test_two_button.py +++ b/software/test_two_button.py @@ -14,22 +14,18 @@ """Tests for two_button.py""" -import sys -import unittest import tempfile -import threading -import time +import unittest from gpiozero import Device from gpiozero.pins.mock import MockFactory -import two_button - import authbox.api import authbox.badgereader_hid_keystroking -from authbox import fake_gpio_for_testing +import two_button +from authbox import fake_gpio_for_testing # noqa: F401 -SAMPLE_CONFIG = b''' +SAMPLE_CONFIG = b""" [pins] on_button=Button:11:38 off_button=Button:16:37 @@ -44,47 +40,52 @@ command = touch enabled extend_command = touch enabled deauth_command = rm -f enabled -''' +""" + # This is the fastest way to ensure that basic logic is right, but it does not # test the use of BaseDispatcher.event_queue or the way callbacks happen on the # same thread serialized. class SimpleDispatcherTest(unittest.TestCase): - def setUp(self): - Device.pin_factory = MockFactory() + def setUp(self): + Device.pin_factory = MockFactory() - try: - from authbox import fake_evdev_device_for_testing - except ModuleNotFoundError: - self.fail("Test requires evdev, but evdev is not available") - authbox.badgereader_hid_keystroking.evdev.list_devices = fake_evdev_device_for_testing.list_devices - authbox.badgereader_hid_keystroking.evdev.InputDevice = fake_evdev_device_for_testing.InputDevice + try: + from authbox import fake_evdev_device_for_testing + except ModuleNotFoundError: + self.fail("Test requires evdev, but evdev is not available") + authbox.badgereader_hid_keystroking.evdev.list_devices = ( + fake_evdev_device_for_testing.list_devices + ) + authbox.badgereader_hid_keystroking.evdev.InputDevice = ( + fake_evdev_device_for_testing.InputDevice + ) - with tempfile.NamedTemporaryFile() as f: - f.write(SAMPLE_CONFIG) - f.flush() - config = authbox.config.Config(f.name) + with tempfile.NamedTemporaryFile() as f: + f.write(SAMPLE_CONFIG) + f.flush() + config = authbox.config.Config(f.name) - self.dispatcher = two_button.Dispatcher(config) + self.dispatcher = two_button.Dispatcher(config) - def is_relay_on(self): - relay = getattr(self.dispatcher, "enable_output") - return relay.gpio_relay.value + def is_relay_on(self): + relay = getattr(self.dispatcher, "enable_output") + return relay.gpio_relay.value - def test_auth_flow(self): - # Out of the box, relay should be off - self.assertFalse(self.dispatcher.authorized) - self.assertFalse(self.is_relay_on()) - # Badge scan sets authorized flag, but doesn't enable relay until button - # press. - self.dispatcher.badge_scan('1234') - self.assertTrue(self.dispatcher.authorized) - self.assertFalse(self.is_relay_on()) - # "On" button pressed - self.dispatcher.on_button_down(None) - self.assertTrue(self.dispatcher.authorized) - self.assertTrue(self.is_relay_on()) - # "Off" button pressed - self.dispatcher.abort(None) - self.assertFalse(self.dispatcher.authorized) - self.assertFalse(self.is_relay_on()) + def test_auth_flow(self): + # Out of the box, relay should be off + self.assertFalse(self.dispatcher.authorized) + self.assertFalse(self.is_relay_on()) + # Badge scan sets authorized flag, but doesn't enable relay until button + # press. + self.dispatcher.badge_scan("1234") + self.assertTrue(self.dispatcher.authorized) + self.assertFalse(self.is_relay_on()) + # "On" button pressed + self.dispatcher.on_button_down(None) + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on()) + # "Off" button pressed + self.dispatcher.abort(None) + self.assertFalse(self.dispatcher.authorized) + self.assertFalse(self.is_relay_on()) diff --git a/software/two_button.py b/software/two_button.py index 5383df1..92b9b0b 100644 --- a/software/two_button.py +++ b/software/two_button.py @@ -19,131 +19,153 @@ """ from __future__ import print_function -import atexit import os -import sys -import subprocess import shlex +import subprocess +import sys from authbox.api import BaseDispatcher from authbox.config import Config from authbox.timer import Timer -DEVNULL = open('/dev/null', 'r+') +DEVNULL = open("/dev/null", "r+") + class Dispatcher(BaseDispatcher): - def __init__(self, config): - super(Dispatcher, self).__init__(config) - - self.authorized = False - self.load_config_object('on_button', on_down=self.on_button_down) - self.load_config_object('off_button', on_down=self.abort) - self.load_config_object('badge_reader', on_scan=self.badge_scan) - self.load_config_object('enable_output') - self.load_config_object('buzzer') - self.warning_timer = Timer(self.event_queue, 'warning_timer', self.warning) - self.expire_timer = Timer(self.event_queue, 'expire_timer', self.abort) - self.expecting_press_timer = Timer(self.event_queue, 'expecting_press_timer', self.abort) - # Otherwise, start them manually! - self.threads.extend([self.warning_timer, self.expire_timer, self.expecting_press_timer]) - - self.noise = None - - def _get_command_line(self, section, key, format_args): - """Constructs a command line, safely. - - The value can contain {key}, {}, and {5} style interpolation: - - {key} will be resolved in the config.get; those are considered safe and - spaces will separate args. - - {} works on each arg independently (probably not what you want). - - {5} works fine. - """ - value = self.config.get(section, key) - pieces = shlex.split(value) - return [p.format(*format_args) for p in pieces] - - def badge_scan(self, badge_id): - # Malicious badge "numbers" that contain spaces require this extra work. - command = self._get_command_line('auth', 'command', [badge_id]) - # TODO timeout - # TODO test with missing command - rc = subprocess.call(command) - if rc == 0: - self.buzzer.beep() - self.authorized = True - self.badge_id = badge_id - self.expecting_press_timer.set(30) - self.on_button.blink() - else: - self.off_button.blink(1) - self.buzzer.beep() - if self.noise: - self.noise.kill() - if self.config.get('sounds', 'enable') == '1': - sound_command = self._get_command_line('sounds', 'command', [self.config.get('sounds', 'sad_filename')]) - self.noise = subprocess.Popen(sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL) - - def on_button_down(self, source): - print("Button down", source) - if not self.authorized: - self.off_button.blink(1) - self.buzzer.beep() - if self.noise: - self.noise.kill() - if self.config.get('sounds', 'enable') == '1': - sound_command = self._get_command_line('sounds', 'command', [self.config.get('sounds', 'sad_filename')]) - self.noise = subprocess.Popen(sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL) - return - self.expecting_press_timer.cancel() - self.on_button.on() - self.enable_output.on() - self.buzzer.off() - self.warning_timer.cancel() - self.expire_timer.cancel() - # TODO use extend time if we were already enabled, and run its command for - # logging. - # N.b. Duration (or extend) includes the warning time. - self.warning_timer.set(self.config.get_int_seconds('auth', 'duration', '5m') - - self.config.get_int_seconds('auth', 'warning', '10s')) - self.expire_timer.set(self.config.get_int_seconds('auth', 'duration', '5m')) - if self.noise: - self.noise.kill() - self.noise = None - - def abort(self, source): - print("Abort", source) - self.enable_output.off() - if self.authorized: - command = self._get_command_line('auth', 'deauth_command', [self.badge_id]) - subprocess.call(command) - self.off_button.blink(1) - self.buzzer.beep() - self.authorized = False - self.warning_timer.cancel() - self.expecting_press_timer.cancel() - self.expire_timer.cancel() - self.on_button.off() - self.buzzer.off() - if self.noise: - self.noise.kill() - self.noise = None - - def warning(self, unused_source): - self.buzzer.beepbeep() - if self.config.get('sounds', 'enable') == '1': - sound_command = self._get_command_line('sounds', 'command', [self.config.get('sounds', 'warning_filename')]) - self.noise = subprocess.Popen(shlex.split(sound_command), stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL) - self.on_button.blink() + def __init__(self, config): + super(Dispatcher, self).__init__(config) + + self.authorized = False + self.load_config_object("on_button", on_down=self.on_button_down) + self.load_config_object("off_button", on_down=self.abort) + self.load_config_object("badge_reader", on_scan=self.badge_scan) + self.load_config_object("enable_output") + self.load_config_object("buzzer") + self.warning_timer = Timer(self.event_queue, "warning_timer", self.warning) + self.expire_timer = Timer(self.event_queue, "expire_timer", self.abort) + self.expecting_press_timer = Timer( + self.event_queue, "expecting_press_timer", self.abort + ) + # Otherwise, start them manually! + self.threads.extend( + [self.warning_timer, self.expire_timer, self.expecting_press_timer] + ) + + self.noise = None + + def _get_command_line(self, section, key, format_args): + """Constructs a command line, safely. + + The value can contain {key}, {}, and {5} style interpolation: + - {key} will be resolved in the config.get; those are considered safe and + spaces will separate args. + - {} works on each arg independently (probably not what you want). + - {5} works fine. + """ + value = self.config.get(section, key) + pieces = shlex.split(value) + return [p.format(*format_args) for p in pieces] + + def badge_scan(self, badge_id): + # Malicious badge "numbers" that contain spaces require this extra work. + command = self._get_command_line("auth", "command", [badge_id]) + # TODO timeout + # TODO test with missing command + rc = subprocess.call(command) + if rc == 0: + self.buzzer.beep() + self.authorized = True + self.badge_id = badge_id + self.expecting_press_timer.set(30) + self.on_button.blink() + else: + self.off_button.blink(1) + self.buzzer.beep() + if self.noise: + self.noise.kill() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "sad_filename")] + ) + self.noise = subprocess.Popen( + sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL + ) + + def on_button_down(self, source): + print("Button down", source) + if not self.authorized: + self.off_button.blink(1) + self.buzzer.beep() + if self.noise: + self.noise.kill() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "sad_filename")] + ) + self.noise = subprocess.Popen( + sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL + ) + return + self.expecting_press_timer.cancel() + self.on_button.on() + self.enable_output.on() + self.buzzer.off() + self.warning_timer.cancel() + self.expire_timer.cancel() + # TODO use extend time if we were already enabled, and run its command for + # logging. + # N.b. Duration (or extend) includes the warning time. + self.warning_timer.set( + self.config.get_int_seconds("auth", "duration", "5m") + - self.config.get_int_seconds("auth", "warning", "10s") + ) + self.expire_timer.set(self.config.get_int_seconds("auth", "duration", "5m")) + if self.noise: + self.noise.kill() + self.noise = None + + def abort(self, source): + print("Abort", source) + self.enable_output.off() + if self.authorized: + command = self._get_command_line("auth", "deauth_command", [self.badge_id]) + subprocess.call(command) + self.off_button.blink(1) + self.buzzer.beep() + self.authorized = False + self.warning_timer.cancel() + self.expecting_press_timer.cancel() + self.expire_timer.cancel() + self.on_button.off() + self.buzzer.off() + if self.noise: + self.noise.kill() + self.noise = None + + def warning(self, unused_source): + self.buzzer.beepbeep() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "warning_filename")] + ) + self.noise = subprocess.Popen( + shlex.split(sound_command), + stdin=DEVNULL, + stdout=DEVNULL, + stderr=DEVNULL, + ) + self.on_button.blink() def main(args): - if not args: - root = '~' - else: - root = args[0] + if not args: + root = "~" + else: + root = args[0] + + config = Config(os.path.join(root, ".authboxrc")) + Dispatcher(config).run_loop() - config = Config(os.path.join(root, '.authboxrc')) - Dispatcher(config).run_loop() -if __name__ == '__main__': - main(sys.argv[1:]) +if __name__ == "__main__": + main(sys.argv[1:]) From 779bb18f825502b99cadc9ae180440c53a7d16bd Mon Sep 17 00:00:00 2001 From: Brian Beck Date: Wed, 25 Mar 2026 15:47:02 -0700 Subject: [PATCH 2/2] Implement delayed-off support This allows each output pin to have a configured delay between the abort button being pressed and the pin being disabled. This supports scenarios such as keeping ventilation on longer after a tool has been deactivated. Tested: * Added unit tests for delayed off and reauth during delay timer. All tests pass * Physical hardware: confirmed that previous config file (with no changes, no delay section) continues working as before (all outputs immediately turn off) * Physical hardware: confirmed that outputs stay on during delay and turn off after delay (tested delays 10s, 180s) * Physical hardware: confirmed that deauth and then reauth during the delay timer period leaves the device continuously on with no interruption This commit was generated in part with Gemini 3.1 Pro. Prompt: ``` Write a plan to add the ability to control the two outputs separately, specifically keeping one output alive for a configurable duration after the "off" behavior is triggered. In two_button.ini the parameter `enable_output` currently toggles both `Relay:ActiveHigh:29` and `Relay:ActiveHigh:31`. The logic to read these parameters is in two_button.py It is critical that all changes be backwards compatible - that is, after the change, the old version of the config should trigger the same behavior as today (trigger both outputs out and on at the same time). There should be new syntax options that allow using this new behavior. The code should handle the case where someone triggers an `abort` and then triggers an `on_button_down` before the timer expires. In this case, the output should remain continuously on. * Add a new optional parameter for how long to keep Relay:ActiveHigh:31 after the service is disabled. This duration should be specified in seconds. * In the `abort` function in two_button.py, instead of immediately turning all options off, if a delay >0 is configured, a timer should be started. When the timer expires that output should be disabled ``` --- software/test_two_button.py | 129 ++++++++++++++++++++++++++++++++++ software/two_button.ini | 2 + software/two_button.py | 135 ++++++++++++++++++++++++++++++++---- 3 files changed, 251 insertions(+), 15 deletions(-) diff --git a/software/test_two_button.py b/software/test_two_button.py index ff130c5..0ebb4f9 100755 --- a/software/test_two_button.py +++ b/software/test_two_button.py @@ -15,6 +15,7 @@ """Tests for two_button.py""" import tempfile +import time import unittest from gpiozero import Device @@ -42,6 +43,24 @@ deauth_command = rm -f enabled """ +SAMPLE_CONFIG_DELAYED = b""" +[pins] +on_button=Button:11:38 +off_button=Button:16:37 +enable_output=Relay:ActiveHigh:29, Relay:ActiveHigh:31 +output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 1 ] +badge_reader=HIDKeystrokingReader:badge_scanner +buzzer=Buzzer:35 +[auth] +duration=20s +warning=10s +extend=20s + +command = touch enabled +extend_command = touch enabled +deauth_command = rm -f enabled +""" + # This is the fastest way to ensure that basic logic is right, but it does not # test the use of BaseDispatcher.event_queue or the way callbacks happen on the @@ -89,3 +108,113 @@ def test_auth_flow(self): self.dispatcher.abort(None) self.assertFalse(self.dispatcher.authorized) self.assertFalse(self.is_relay_on()) + + +class DelayedOffTest(unittest.TestCase): + def setUp(self): + Device.pin_factory = MockFactory() + + try: + from authbox import fake_evdev_device_for_testing + except ModuleNotFoundError: + self.fail("Test requires evdev, but evdev is not available") + authbox.badgereader_hid_keystroking.evdev.list_devices = ( + fake_evdev_device_for_testing.list_devices + ) + authbox.badgereader_hid_keystroking.evdev.InputDevice = ( + fake_evdev_device_for_testing.InputDevice + ) + + with tempfile.NamedTemporaryFile() as f: + f.write(SAMPLE_CONFIG_DELAYED) + f.flush() + config = authbox.config.Config(f.name) + + self.dispatcher = two_button.Dispatcher(config) + for t in self.dispatcher.threads: + if t.__class__.__name__ == "Timer": + t.start() + + def _process_events(self): + while not self.dispatcher.event_queue.empty(): + item = self.dispatcher.event_queue.get_nowait() + if item is authbox.api.SHUTDOWN_SENTINEL: + break + func, args = item[0], item[1:] + func(*args) + + def is_relay_on(self, index_or_name_or_obj): + if isinstance(index_or_name_or_obj, int): + obj = self.dispatcher.outputs[index_or_name_or_obj][0] + elif isinstance(index_or_name_or_obj, str): + obj = getattr(self.dispatcher, index_or_name_or_obj) + else: + obj = index_or_name_or_obj + + if hasattr(obj, "gpio_relay"): + return obj.gpio_relay.value + elif hasattr(obj, "objs"): + return [r.gpio_relay.value for r in obj.objs] + else: + # It might be a mock object or something else + return obj.is_on if hasattr(obj, "is_on") else False # Fallback + + def test_delayed_off(self): + # Out of the box, relay should be off + self.assertFalse(self.dispatcher.authorized) + + self.assertFalse(self.is_relay_on(0)) + self.assertFalse(self.is_relay_on(1)) + + # Badge scan sets authorized flag + self.dispatcher.badge_scan("1234") + self.assertTrue(self.dispatcher.authorized) + + # "On" button pressed + self.dispatcher.on_button_down(None) + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # "Off" button pressed + self.dispatcher.abort(None) + # The dispatcher state should be not authorized + self.assertFalse(self.dispatcher.authorized) + # Main output (0) should be off immediately + self.assertFalse(self.is_relay_on(0)) + # Delayed output (1) should be ON STILL + self.assertTrue(self.is_relay_on(1)) + + # Wait for delay (1s) + buffer + time.sleep(1.5) + self._process_events() + + # Delayed output should be OFF + self.assertFalse(self.is_relay_on(1)) + + def test_cancel_delayed_off(self): + # Badge scan and turn on + self.dispatcher.badge_scan("1234") + self.dispatcher.on_button_down(None) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Abort + self.dispatcher.abort(None) + self.assertFalse(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Wait 0.5s, then turn back on + time.sleep(0.5) + self.dispatcher.on_button_down(None) # Resume! + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Wait for the original delay (1s) to show it was cancelled + time.sleep(1.5) + self._process_events() + + # Should STILL be on! + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) diff --git a/software/two_button.ini b/software/two_button.ini index 0edc4f1..42d121d 100644 --- a/software/two_button.ini +++ b/software/two_button.ini @@ -37,6 +37,8 @@ buzzer = Buzzer:35 badge_reader = HIDKeystrokingReader:HID OMNIKEY 5427 CK # For Authboard v0.4 29=J13 (small relay for interlock), 31=J12 (small relay for bofa) enable_output = Relay:ActiveHigh:29, Relay:ActiveHigh:31 +# During the specified duration, the outputs will remain on after abort +output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 60 ] [auth] # EDIT ME! diff --git a/software/two_button.py b/software/two_button.py index 92b9b0b..3e8d285 100644 --- a/software/two_button.py +++ b/software/two_button.py @@ -24,7 +24,7 @@ import subprocess import sys -from authbox.api import BaseDispatcher +from authbox.api import BaseDispatcher, MultiProxy, split_escaped from authbox.config import Config from authbox.timer import Timer @@ -41,17 +41,54 @@ def __init__(self, config): self.load_config_object("badge_reader", on_scan=self.badge_scan) self.load_config_object("enable_output") self.load_config_object("buzzer") + + # Custom loading for enable_output to support delay + enable_outputs_config = list( + split_escaped(self.config.get("pins", "enable_output"), preserve=True) + ) + + self.delay_map = self._load_delay_map() + + self.outputs = [] + if isinstance(self.enable_output, MultiProxy): + objs = self.enable_output.objs + else: + objs = [self.enable_output] + + for i, obj in enumerate(objs): + if i < len(enable_outputs_config): + pin_str = enable_outputs_config[i].strip() + delay = self.delay_map.get(pin_str, 0) + self.outputs.append((obj, delay)) + else: + self.outputs.append((obj, 0)) + self.warning_timer = Timer(self.event_queue, "warning_timer", self.warning) self.expire_timer = Timer(self.event_queue, "expire_timer", self.abort) self.expecting_press_timer = Timer( self.event_queue, "expecting_press_timer", self.abort ) - # Otherwise, start them manually! + + self.timers = {} self.threads.extend( [self.warning_timer, self.expire_timer, self.expecting_press_timer] ) + for obj, delay in self.outputs: + if delay > 0: + timer_name = f"off_timer_{id(obj)}" + # Lambda captures obj correctly if we use a default arg. + timer = Timer( + self.event_queue, + timer_name, + lambda source, o=obj: self.delayed_off_generic(o), + ) + self.timers[id(obj)] = (timer, delay) + self.threads.append(timer) + self.noise = None + self.delayed_off_running = False + self.running_timers_count = 0 def _get_command_line(self, section, key, format_args): """Constructs a command line, safely. @@ -66,6 +103,43 @@ def _get_command_line(self, section, key, format_args): pieces = shlex.split(value) return [p.format(*format_args) for p in pieces] + def _load_delay_map(self): + delay_map = {} + try: + delays_str = self.config.get("pins", "output_off_delay_seconds") + delay_map = self._parse_delay_str(delays_str) + except Exception: + pass + + if not delay_map: + try: + delays_str = self.config.get("auth", "output_off_delay_seconds") + delay_map = self._parse_delay_str(delays_str) + except Exception: + pass + + return delay_map + + def _parse_delay_str(self, delays_str): + delay_map = {} + if not delays_str: + return delay_map + delays_str = delays_str.replace("[", "").replace("]", "").strip() + pairs = [p.strip() for p in delays_str.split(",")] + for pair in pairs: + if "=" in pair: + k, v = pair.split("=") + k = k.strip() + v = v.strip() + try: + # Use Config.parse_time to support suffixes + from authbox.config import Config + + delay_map[k] = Config.parse_time(v) + except Exception as e: + print("Error parsing delay for", k, v, e) + return delay_map + def badge_scan(self, badge_id): # Malicious badge "numbers" that contain spaces require this extra work. command = self._get_command_line("auth", "command", [badge_id]) @@ -94,21 +168,30 @@ def badge_scan(self, badge_id): def on_button_down(self, source): print("Button down", source) if not self.authorized: - self.off_button.blink(1) - self.buzzer.beep() - if self.noise: - self.noise.kill() - if self.config.get("sounds", "enable") == "1": - sound_command = self._get_command_line( - "sounds", "command", [self.config.get("sounds", "sad_filename")] - ) - self.noise = subprocess.Popen( - sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL - ) - return + if self.delayed_off_running: + self.authorized = True + self.delayed_off_running = False + self.running_timers_count = 0 + for timer, _ in self.timers.values(): + timer.cancel() + else: + self.off_button.blink(1) + self.buzzer.beep() + if self.noise: + self.noise.kill() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "sad_filename")] + ) + self.noise = subprocess.Popen( + sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL + ) + return self.expecting_press_timer.cancel() self.on_button.on() self.enable_output.on() + for timer, _ in self.timers.values(): + timer.cancel() self.buzzer.off() self.warning_timer.cancel() self.expire_timer.cancel() @@ -126,7 +209,21 @@ def on_button_down(self, source): def abort(self, source): print("Abort", source) - self.enable_output.off() + delayed_count = 0 + for obj, delay in self.outputs: + if delay == 0: + obj.off() + else: + timer, _ = self.timers.get(id(obj), (None, None)) + if timer: + timer.cancel() + timer.set(delay) + delayed_count += 1 + + if delayed_count > 0: + self.delayed_off_running = True + self.running_timers_count = delayed_count + if self.authorized: command = self._get_command_line("auth", "deauth_command", [self.badge_id]) subprocess.call(command) @@ -142,6 +239,14 @@ def abort(self, source): self.noise.kill() self.noise = None + def delayed_off_generic(self, obj): + print("Delayed off generic", obj) + obj.off() + self.running_timers_count -= 1 + if self.running_timers_count <= 0: + self.delayed_off_running = False + self.running_timers_count = 0 + def warning(self, unused_source): self.buzzer.beepbeep() if self.config.get("sounds", "enable") == "1":