diff --git a/software/Makefile b/software/Makefile index a61cab9..38f2898 100644 --- a/software/Makefile +++ b/software/Makefile @@ -1,5 +1,5 @@ .ONESHELL: -SOURCES=authbox setup.py +SOURCES=authbox *.py ifeq ($(shell grep ^ID= /etc/os-release | cut -d = -f 2), raspbian) RASPBIAN=1 diff --git a/software/authbox/__init__.py b/software/authbox/__init__.py index 9cce9d4..db5ea8b 100755 --- a/software/authbox/__init__.py +++ b/software/authbox/__init__.py @@ -19,9 +19,11 @@ __version__ = "2.0.0" try: - from gpiozero import Device - del Device + from gpiozero import Device + + del Device except ModuleNotFoundError: - print("ERROR: 'gpiozero' must be installed") - import sys - sys.exit(1) + print("ERROR: 'gpiozero' must be installed") + import sys + + sys.exit(1) diff --git a/software/authbox/api.py b/software/authbox/api.py index 3308735..4fb7d2e 100755 --- a/software/authbox/api.py +++ b/software/authbox/api.py @@ -130,9 +130,7 @@ def run(self): class BasePinThread(BaseDerivedThread): - def __init__( - self, event_queue, config_name, input_pin, output_pin - ): + def __init__(self, event_queue, config_name, input_pin, output_pin): super(BasePinThread, self).__init__(event_queue, config_name) self.input_pin = input_pin diff --git a/software/authbox/badgereader_hid_keystroking.py b/software/authbox/badgereader_hid_keystroking.py index 2f3e549..8d6ded1 100755 --- a/software/authbox/badgereader_hid_keystroking.py +++ b/software/authbox/badgereader_hid_keystroking.py @@ -21,6 +21,7 @@ from authbox.api import BaseDerivedThread, NoMatchingDevice + class HIDKeystrokingReader(BaseDerivedThread): """Badge reader hardware abstraction. @@ -178,9 +179,9 @@ def __init__( def setUp(self): try: - import evdev + import evdev # noqa: F401 except ModuleNotFoundError: - self.fail("evdev not available") + self.fail("evdev not available") def get_scanner_device(self): """Finds connected device matching device_name. diff --git a/software/authbox/badgereader_wiegand_gpio.py b/software/authbox/badgereader_wiegand_gpio.py index 3fadf0b..9dc9c32 100755 --- a/software/authbox/badgereader_wiegand_gpio.py +++ b/software/authbox/badgereader_wiegand_gpio.py @@ -73,8 +73,8 @@ def __init__( self.timeout_in_seconds = float(timeout_in_ms) / 1000 if self._on_scan: - self.d0_input_device.when_activated = self.decode; - self.d1_input_device.when_activated = self.decode; + self.d0_input_device.when_activated = self.decode + self.d1_input_device.when_activated = self.decode def decode(self, channel): bit = "0" if channel == self.d0_input_device else "1" @@ -119,4 +119,3 @@ def close(self): self.d0_input_device.close() if self.d1_input_device: self.d1_input_device.close() - diff --git a/software/authbox/fake_gpio_for_testing.py b/software/authbox/fake_gpio_for_testing.py index c249812..0a7ea29 100644 --- a/software/authbox/fake_gpio_for_testing.py +++ b/software/authbox/fake_gpio_for_testing.py @@ -16,19 +16,20 @@ from __future__ import print_function -import time +import time # noqa: F401 def _log_match(a, b): return abs(a[0] - b[0]) < 0.1 and a[1] == b[1] and a[2] == b[2] + class FakeTime(object): """Fake for the module 'time' so tests run faster.""" def __init__(self): self.t = 0 - def time(self): + def time(self): # noqa: F811 return self.t def sleep(self, x): diff --git a/software/authbox/gpio_button.py b/software/authbox/gpio_button.py index b52c0c2..b0f03ea 100755 --- a/software/authbox/gpio_button.py +++ b/software/authbox/gpio_button.py @@ -15,10 +15,12 @@ """Abstraction for blinky buttons. """ +import time + +import gpiozero + from authbox.api import BasePinThread from authbox.compat import queue -import gpiozero -import time class Button(BasePinThread): @@ -55,26 +57,26 @@ def __init__( self.steady_state = False self.gpio_led = gpiozero.LED(pin="BOARD" + str(self.output_pin)) button_pin = "BOARD" + str(self.input_pin) - self.gpio_button = gpiozero.Button(button_pin, bounce_time = 0.15) + self.gpio_button = gpiozero.Button(button_pin, bounce_time=0.15) if self._on_down: self.gpio_button.when_pressed = self._callback def _callback(self): """Wrapper to queue events instead of calling them directly.""" # If we have a callback registered, debounce the switch press - if (self._on_down): + if self._on_down: # This is a de-bounce filter to prevent spurious signals from triggering the logic # Looks for 5 continuous active states (each separated by 10ms) - maxcount = 15 # Look for 150ms maximum - lowcount = 0 # Count the number of active states seen - while ((maxcount > 0) and (lowcount <= 4)): - time.sleep(0.01) # 10ms delay between each cycle - maxcount = maxcount - 1 # Decrement remaining cycles - if (self.gpio_button.is_pressed): - lowcount = lowcount + 1 # One more low cycle detected + maxcount = 15 # Look for 150ms maximum + lowcount = 0 # Count the number of active states seen + while (maxcount > 0) and (lowcount <= 4): + time.sleep(0.01) # 10ms delay between each cycle + maxcount = maxcount - 1 # Decrement remaining cycles + if self.gpio_button.is_pressed: + lowcount = lowcount + 1 # One more low cycle detected else: - lowcount = 0 # Not continuously low, reset - if (lowcount > 4): + lowcount = 0 # Not continuously low, reset + if lowcount > 4: self.event_queue.put((self._on_down, self)) def run_inner(self): diff --git a/software/authbox/gpio_buzzer.py b/software/authbox/gpio_buzzer.py index 0056cd0..5e42dc4 100644 --- a/software/authbox/gpio_buzzer.py +++ b/software/authbox/gpio_buzzer.py @@ -16,9 +16,10 @@ """ from __future__ import print_function -import gpiozero import time +import gpiozero + from authbox.api import BasePinThread from authbox.compat import queue diff --git a/software/authbox/gpio_relay.py b/software/authbox/gpio_relay.py index 4c3d976..c418ca2 100755 --- a/software/authbox/gpio_relay.py +++ b/software/authbox/gpio_relay.py @@ -38,11 +38,11 @@ class Relay(BasePinThread): """ def __init__(self, event_queue, config_name, output_type, output_pin): - super(Relay, self).__init__( - event_queue, config_name, None, int(output_pin) - ) + super(Relay, self).__init__(event_queue, config_name, None, int(output_pin)) self.output_on_val = types[output_type] - self.gpio_relay = gpiozero.DigitalOutputDevice("BOARD" + str(output_pin), initial_value= not types[output_type]) + self.gpio_relay = gpiozero.DigitalOutputDevice( + "BOARD" + str(output_pin), initial_value=not types[output_type] + ) # TODO: Push this initial setup into BasePinThread, to avoid a momentary glitch self.off() diff --git a/software/authbox/tests/setup_mock_pin_factory.py b/software/authbox/tests/setup_mock_pin_factory.py index 9b2afb5..c84c1e3 100755 --- a/software/authbox/tests/setup_mock_pin_factory.py +++ b/software/authbox/tests/setup_mock_pin_factory.py @@ -1,10 +1,12 @@ try: - from gpiozero import Device - if not Device.pin_factory: - from gpiozero.pins.mock import MockFactory - Device.pin_factory = MockFactory() + from gpiozero import Device + + if not Device.pin_factory: + from gpiozero.pins.mock import MockFactory + + Device.pin_factory = MockFactory() except ModuleNotFoundError: - print("ERROR: 'gpiozero' must be installed") - import sys - sys.exit(1) + print("ERROR: 'gpiozero' must be installed") + import sys + sys.exit(1) diff --git a/software/authbox/tests/test_api.py b/software/authbox/tests/test_api.py index be81914..f58ec5e 100755 --- a/software/authbox/tests/test_api.py +++ b/software/authbox/tests/test_api.py @@ -14,17 +14,18 @@ """Tests for authbox.api""" -import gpiozero -import gpiozero.pins.mock import tempfile import unittest -import setup_mock_pin_factory +import gpiozero +import gpiozero.pins.mock # noqa: F401 import authbox.api import authbox.config import authbox.gpio_button +from . import setup_mock_pin_factory # noqa: F401 + SAMPLE_CONFIG = b""" [pins] button_a = Button:11:38 @@ -41,17 +42,18 @@ def test_no_duplicate_names(self): self.assertEqual(len(short_names), len(authbox.api.CLASS_REGISTRY)) def test_all_names_importable(self): - try: - import evdev - del evdev - except ModuleNotFoundError: - self.fail("Test requires evdev, but evdev is not available") - - for c in authbox.api.CLASS_REGISTRY: - cls = authbox.api._import(c) - assert issubclass( - cls, (authbox.api.BasePinThread, authbox.api.BaseDerivedThread) - ), (c, cls, cls.__bases__) + try: + import evdev + + del evdev + except ModuleNotFoundError: + self.fail("Test requires evdev, but evdev is not available") + + for c in authbox.api.CLASS_REGISTRY: + cls = authbox.api._import(c) + assert issubclass( + cls, (authbox.api.BasePinThread, authbox.api.BaseDerivedThread) + ), (c, cls, cls.__bases__) class DispatcherTest(unittest.TestCase): diff --git a/software/authbox/tests/test_badgereader_hid_keystroking.py b/software/authbox/tests/test_badgereader_hid_keystroking.py index bcdf01d..e704291 100755 --- a/software/authbox/tests/test_badgereader_hid_keystroking.py +++ b/software/authbox/tests/test_badgereader_hid_keystroking.py @@ -17,16 +17,15 @@ import unittest import authbox.badgereader_hid_keystroking - from authbox.compat import queue class BadgereaderTest(unittest.TestCase): def setUp(self): try: - from authbox import fake_evdev_device_for_testing + from authbox import fake_evdev_device_for_testing except ModuleNotFoundError: - self.fail("Test requires evdev, but evdev is not available") + self.fail("Test requires evdev, but evdev is not available") authbox.badgereader_hid_keystroking.evdev.list_devices = ( fake_evdev_device_for_testing.list_devices diff --git a/software/authbox/tests/test_badgereader_wiegand_gpio.py b/software/authbox/tests/test_badgereader_wiegand_gpio.py index 37e0fed..1525d6d 100755 --- a/software/authbox/tests/test_badgereader_wiegand_gpio.py +++ b/software/authbox/tests/test_badgereader_wiegand_gpio.py @@ -14,16 +14,15 @@ """Tests for authbox.badgereader_wiegand_gpio""" -import gpiozero import threading import time import unittest -import setup_mock_pin_factory - import authbox.badgereader_wiegand_gpio from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + class BadgereaderWiegandGPIOTest(unittest.TestCase): def setUp(self): @@ -35,7 +34,7 @@ def setUp(self): "40", on_scan=self.on_scan, ) - + def tearDown(self): self.b.close() @@ -70,7 +69,7 @@ def add_bits_later(): def test_limited_queue_size(self): self.b.d1_input_device.pin.drive_low() for i in range(500): - # Send a 0 + # Send a 0 self.b.d0_input_device.pin.drive_high() self.b.d0_input_device.pin.drive_low() self.b.run_inner() diff --git a/software/authbox/tests/test_config.py b/software/authbox/tests/test_config.py index 427cfe1..f6c0ce7 100755 --- a/software/authbox/tests/test_config.py +++ b/software/authbox/tests/test_config.py @@ -16,10 +16,10 @@ import unittest -import setup_mock_pin_factory - import authbox.config +from . import setup_mock_pin_factory # noqa: F401 + class ConfigTest(unittest.TestCase): def test_parse_time(self): diff --git a/software/authbox/tests/test_gpio_button.py b/software/authbox/tests/test_gpio_button.py index 3b9cd17..117c332 100755 --- a/software/authbox/tests/test_gpio_button.py +++ b/software/authbox/tests/test_gpio_button.py @@ -17,12 +17,13 @@ import unittest from functools import partial -import setup_mock_pin_factory - import authbox.gpio_button from authbox import fake_gpio_for_testing from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + + class TestButton(authbox.gpio_button.Button): def drive_high(self): self.gpio_button.pin.drive_high() @@ -32,7 +33,7 @@ def clear_states(self): self.gpio_led.pin.clear_states() def assert_button_states(self, expected_states): - assert len(self.gpio_button.pin.states) == len(expected_states) + assert len(self.gpio_button.pin.states) == len(expected_states) self.gpio_button.pin.assert_states(expected_states) def assert_led_states(self, expected_states): @@ -43,6 +44,7 @@ def close(self): self.gpio_button.close() self.gpio_led.close() + class ImpatientQueue(queue.Queue): def __init__(self, fake_time): queue.Queue.__init__(self) @@ -55,6 +57,7 @@ def get(self, block, timeout): raise queue.Empty return queue.Queue.get(self, block=block, timeout=timeout) + class BlinkTest(unittest.TestCase): def setUp(self): self.time = fake_gpio_for_testing.FakeTime() @@ -71,15 +74,15 @@ def setUp(self): self.b.clear_states() def tearDown(self): - self.b.close() + self.b.close() def on_down(self): pass def test_on(self): # Verify pins are correctly configured - self.assertEqual('input', self.b.gpio_button.pin._function) - self.assertEqual('output', self.b.gpio_led.pin._function) + self.assertEqual("input", self.b.gpio_button.pin._function) + self.assertEqual("output", self.b.gpio_led.pin._function) self.b.drive_high() self.b.run_inner() @@ -94,7 +97,9 @@ def test_blinking_thread(self): self.b.run_inner() # Number of on/off transitions corresponds to number of run_inner() calls - self.b.assert_led_states([False, True, False, True, False, True, False, True, False]) + self.b.assert_led_states( + [False, True, False, True, False, True, False, True, False] + ) def test_finite_blink_count_from_off(self): self.b.off() @@ -159,10 +164,10 @@ def test_overlapping_blink_counts(self): # won't be user visible because it's the same "on" state as the initial # steady state to which we return. self.b.assert_led_states( - [ - False, # Startup off state + [ + False, # Startup off state True, # Initial steady state - False, # First blink cycle starts + False, # First blink cycle starts True, # Second blink cycle starts False, True, # Second blink cycle ends after 4 cycles diff --git a/software/authbox/tests/test_gpio_buzzer.py b/software/authbox/tests/test_gpio_buzzer.py index 764c567..08d5d92 100755 --- a/software/authbox/tests/test_gpio_buzzer.py +++ b/software/authbox/tests/test_gpio_buzzer.py @@ -16,12 +16,13 @@ import unittest -import setup_mock_pin_factory - import authbox.gpio_buzzer from authbox import fake_gpio_for_testing from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + + class TestBuzzer(authbox.gpio_buzzer.Buzzer): def assert_states(self, expected_states): assert len(self.gpio_buzzer.pin.states) == len(expected_states) @@ -31,7 +32,8 @@ def clear_states(self): self.gpio_buzzer.pin.clear_states() def close(self): - self.gpio_buzzer.close() + self.gpio_buzzer.close() + class BuzzerTest(unittest.TestCase): def setUp(self): @@ -41,9 +43,9 @@ def setUp(self): self.q = queue.Queue() self.b = TestBuzzer(self.q, "b", "15") self.b.clear_states() - + def tearDown(self): - self.b.close() + self.b.close() def test_on(self): self.time.sleep(2) @@ -59,7 +61,7 @@ def test_off(self): self.assertRaises(queue.Empty, self.b.run_inner, False) self.b.assert_states([False]) - def test_beep(self): + def test_beep(self): self.b.beep() self.b.run_inner(False) self.b.on() diff --git a/software/authbox/tests/test_gpio_relay.py b/software/authbox/tests/test_gpio_relay.py index de44ca6..3a6c750 100755 --- a/software/authbox/tests/test_gpio_relay.py +++ b/software/authbox/tests/test_gpio_relay.py @@ -16,18 +16,21 @@ import unittest -import setup_mock_pin_factory - import authbox.gpio_relay from authbox import fake_gpio_for_testing from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + + class TestRelay(authbox.gpio_relay.Relay): def assert_states(self, expected_states): assert len(self.gpio_relay.pin.states) == len(expected_states) self.gpio_relay.pin.assert_states(expected_states) + def clear_states(self): - self.gpio_relay.pin.clear_states() + self.gpio_relay.pin.clear_states() + class RelayTest(unittest.TestCase): def setUp(self): diff --git a/software/authbox/tests/test_timer.py b/software/authbox/tests/test_timer.py index a67e068..47a5a28 100755 --- a/software/authbox/tests/test_timer.py +++ b/software/authbox/tests/test_timer.py @@ -16,11 +16,11 @@ import unittest -import setup_mock_pin_factory - import authbox.timer from authbox.compat import queue +from . import setup_mock_pin_factory # noqa: F401 + class TimerTest(unittest.TestCase): def setUp(self): diff --git a/software/lockbox.py b/software/lockbox.py index ff53c05..a7256c1 100644 --- a/software/lockbox.py +++ b/software/lockbox.py @@ -19,66 +19,69 @@ """ from __future__ import print_function -import atexit import os -import sys -import subprocess import shlex +import subprocess +import sys from authbox.api import BaseDispatcher from authbox.config import Config from authbox.timer import Timer -DEVNULL = open('/dev/null', 'r+') - -class Dispatcher(BaseDispatcher): - def __init__(self, config): - super(Dispatcher, self).__init__(config) - - self.load_config_object('badge_reader', on_scan=self.badge_scan) - self.load_config_object('output_relay') +DEVNULL = open("/dev/null", "r+") - self.disable_timer = Timer(self.event_queue, 'disable_timer', self.disable) - # Otherwise, start them manually! - self.threads.extend([self.disable_timer]) +class Dispatcher(BaseDispatcher): + def __init__(self, config): + super(Dispatcher, self).__init__(config) + + self.load_config_object("badge_reader", on_scan=self.badge_scan) + self.load_config_object("output_relay") + + self.disable_timer = Timer(self.event_queue, "disable_timer", self.disable) + # Otherwise, start them manually! + self.threads.extend([self.disable_timer]) + + def _get_command_line(self, section, key, format_args): + """Constructs a command line, safely. + + The value can contain {key}, {}, and {5} style interpolation: + - {key} will be resolved in the config.get; those are considered safe and + spaces will separate args. + - {} works on each arg independently (probably not what you want). + - {5} works fine. + """ + value = self.config.get(section, key) + pieces = shlex.split(value) + return [p.format(*format_args) for p in pieces] + + def badge_scan(self, badge_id): + # Malicious badge "numbers" that contain spaces require this extra work. + command = self._get_command_line("auth", "command", [badge_id]) + # TODO timeout + # TODO test with missing command + rc = subprocess.call(command) + + if rc == 0: + self.disable_timer.cancel() + self.output_relay.on() + self.disable_timer.set( + self.config.get_int_seconds("auth", "duration", "1s") + ) + + def disable(self, source=None): + self.output_relay.off() - def _get_command_line(self, section, key, format_args): - """Constructs a command line, safely. - - The value can contain {key}, {}, and {5} style interpolation: - - {key} will be resolved in the config.get; those are considered safe and - spaces will separate args. - - {} works on each arg independently (probably not what you want). - - {5} works fine. - """ - value = self.config.get(section, key) - pieces = shlex.split(value) - return [p.format(*format_args) for p in pieces] - - def badge_scan(self, badge_id): - # Malicious badge "numbers" that contain spaces require this extra work. - command = self._get_command_line('auth', 'command', [badge_id]) - # TODO timeout - # TODO test with missing command - rc = subprocess.call(command) - - if rc == 0: - self.disable_timer.cancel() - self.output_relay.on() - self.disable_timer.set(self.config.get_int_seconds('auth', 'duration', '1s')) - - def disable(self, source=None): - self.output_relay.off() def main(args): - if not args: - root = '~' - else: - root = args[0] + if not args: + root = "~" + else: + root = args[0] + + config = Config(os.path.join(root, ".authboxrc")) + Dispatcher(config).run_loop() - config = Config(os.path.join(root, '.authboxrc')) - Dispatcher(config).run_loop() -if __name__ == '__main__': - main(sys.argv[1:]) +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/software/qa.py b/software/qa.py index 9c5371f..cb47fd5 100644 --- a/software/qa.py +++ b/software/qa.py @@ -26,54 +26,56 @@ from authbox.config import Config from authbox.timer import Timer -DEVNULL = open('/dev/null', 'r+') +DEVNULL = open("/dev/null", "r+") + class Dispatcher(BaseDispatcher): - def __init__(self, config): - super(Dispatcher, self).__init__(config) - - for i in range(1, 6+1): - self.load_config_object('j%d' % i, on_down=self.on_button_down) - - self.load_config_object('buzzer') - self.load_config_object('relays') - # This may be a MultiProxy, which makes this easier. - self.relays.on() - self.relay_value = True - - # Run fans for 10 seconds on startup - self.relay_timer = Timer(self.event_queue, 'relay_timer', self.toggle_relays) - self.relay_toggle_interval = 10 - self.relay_timer.set(self.relay_toggle_interval) - self.threads.extend([self.relay_timer]) - - def on_button_down(self, source): - print("Button down", source) - self.buzzer.beep() - - source.on() - time.sleep(0.3) - source.off() - - def toggle_relays(self, source): - print("Toggle relay", self.relay_value) - if self.relay_value: - self.relays.off() - self.relay_value = False - else: - self.relays.on() - self.relay_value = True - self.relay_timer.set(self.relay_toggle_interval) + def __init__(self, config): + super(Dispatcher, self).__init__(config) + + for i in range(1, 6 + 1): + self.load_config_object("j%d" % i, on_down=self.on_button_down) + + self.load_config_object("buzzer") + self.load_config_object("relays") + # This may be a MultiProxy, which makes this easier. + self.relays.on() + self.relay_value = True + + # Run fans for 10 seconds on startup + self.relay_timer = Timer(self.event_queue, "relay_timer", self.toggle_relays) + self.relay_toggle_interval = 10 + self.relay_timer.set(self.relay_toggle_interval) + self.threads.extend([self.relay_timer]) + + def on_button_down(self, source): + print("Button down", source) + self.buzzer.beep() + + source.on() + time.sleep(0.3) + source.off() + + def toggle_relays(self, source): + print("Toggle relay", self.relay_value) + if self.relay_value: + self.relays.off() + self.relay_value = False + else: + self.relays.on() + self.relay_value = True + self.relay_timer.set(self.relay_toggle_interval) def main(args): - if not args: - config_filename = 'qa.ini' - else: - config_filename = args[0] + if not args: + config_filename = "qa.ini" + else: + config_filename = args[0] + + config = Config(config_filename) + Dispatcher(config).run_loop() - config = Config(config_filename) - Dispatcher(config).run_loop() -if __name__ == '__main__': - main(sys.argv[1:]) +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/software/requirements-dev.txt b/software/requirements-dev.txt index 8fb0edf..942a649 100755 --- a/software/requirements-dev.txt +++ b/software/requirements-dev.txt @@ -1,8 +1,8 @@ gpiozero==2.0 -nose==1.3. +nose==1.3 coverage==7.2.1 evdev==1.6.1 isort==5.11.5 black==23.1.0 ; python_version >= '3.6' tox==3.28.0 -flake8==3.9.2 +flake8==5.0.4 diff --git a/software/test_qa.py b/software/test_qa.py index 5ea3216..7ddca1e 100644 --- a/software/test_qa.py +++ b/software/test_qa.py @@ -14,8 +14,8 @@ import unittest + class QATest(unittest.TestCase): # TODO def test_import(self): - import qa - + import qa # noqa: F401 diff --git a/software/test_two_button.py b/software/test_two_button.py index e6594b0..0ebb4f9 100755 --- a/software/test_two_button.py +++ b/software/test_two_button.py @@ -14,22 +14,19 @@ """Tests for two_button.py""" -import sys -import unittest import tempfile -import threading import time +import unittest from gpiozero import Device from gpiozero.pins.mock import MockFactory -import two_button - import authbox.api import authbox.badgereader_hid_keystroking -from authbox import fake_gpio_for_testing +import two_button +from authbox import fake_gpio_for_testing # noqa: F401 -SAMPLE_CONFIG = b''' +SAMPLE_CONFIG = b""" [pins] on_button=Button:11:38 off_button=Button:16:37 @@ -44,47 +41,180 @@ command = touch enabled extend_command = touch enabled deauth_command = rm -f enabled -''' +""" + +SAMPLE_CONFIG_DELAYED = b""" +[pins] +on_button=Button:11:38 +off_button=Button:16:37 +enable_output=Relay:ActiveHigh:29, Relay:ActiveHigh:31 +output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 1 ] +badge_reader=HIDKeystrokingReader:badge_scanner +buzzer=Buzzer:35 +[auth] +duration=20s +warning=10s +extend=20s + +command = touch enabled +extend_command = touch enabled +deauth_command = rm -f enabled +""" + # This is the fastest way to ensure that basic logic is right, but it does not # test the use of BaseDispatcher.event_queue or the way callbacks happen on the # same thread serialized. class SimpleDispatcherTest(unittest.TestCase): - def setUp(self): - Device.pin_factory = MockFactory() - - try: - from authbox import fake_evdev_device_for_testing - except ModuleNotFoundError: - self.fail("Test requires evdev, but evdev is not available") - authbox.badgereader_hid_keystroking.evdev.list_devices = fake_evdev_device_for_testing.list_devices - authbox.badgereader_hid_keystroking.evdev.InputDevice = fake_evdev_device_for_testing.InputDevice - - with tempfile.NamedTemporaryFile() as f: - f.write(SAMPLE_CONFIG) - f.flush() - config = authbox.config.Config(f.name) - - self.dispatcher = two_button.Dispatcher(config) - - def is_relay_on(self): - relay = getattr(self.dispatcher, "enable_output") - return relay.gpio_relay.value - - def test_auth_flow(self): - # Out of the box, relay should be off - self.assertFalse(self.dispatcher.authorized) - self.assertFalse(self.is_relay_on()) - # Badge scan sets authorized flag, but doesn't enable relay until button - # press. - self.dispatcher.badge_scan('1234') - self.assertTrue(self.dispatcher.authorized) - self.assertFalse(self.is_relay_on()) - # "On" button pressed - self.dispatcher.on_button_down(None) - self.assertTrue(self.dispatcher.authorized) - self.assertTrue(self.is_relay_on()) - # "Off" button pressed - self.dispatcher.abort(None) - self.assertFalse(self.dispatcher.authorized) - self.assertFalse(self.is_relay_on()) + def setUp(self): + Device.pin_factory = MockFactory() + + try: + from authbox import fake_evdev_device_for_testing + except ModuleNotFoundError: + self.fail("Test requires evdev, but evdev is not available") + authbox.badgereader_hid_keystroking.evdev.list_devices = ( + fake_evdev_device_for_testing.list_devices + ) + authbox.badgereader_hid_keystroking.evdev.InputDevice = ( + fake_evdev_device_for_testing.InputDevice + ) + + with tempfile.NamedTemporaryFile() as f: + f.write(SAMPLE_CONFIG) + f.flush() + config = authbox.config.Config(f.name) + + self.dispatcher = two_button.Dispatcher(config) + + def is_relay_on(self): + relay = getattr(self.dispatcher, "enable_output") + return relay.gpio_relay.value + + def test_auth_flow(self): + # Out of the box, relay should be off + self.assertFalse(self.dispatcher.authorized) + self.assertFalse(self.is_relay_on()) + # Badge scan sets authorized flag, but doesn't enable relay until button + # press. + self.dispatcher.badge_scan("1234") + self.assertTrue(self.dispatcher.authorized) + self.assertFalse(self.is_relay_on()) + # "On" button pressed + self.dispatcher.on_button_down(None) + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on()) + # "Off" button pressed + self.dispatcher.abort(None) + self.assertFalse(self.dispatcher.authorized) + self.assertFalse(self.is_relay_on()) + + +class DelayedOffTest(unittest.TestCase): + def setUp(self): + Device.pin_factory = MockFactory() + + try: + from authbox import fake_evdev_device_for_testing + except ModuleNotFoundError: + self.fail("Test requires evdev, but evdev is not available") + authbox.badgereader_hid_keystroking.evdev.list_devices = ( + fake_evdev_device_for_testing.list_devices + ) + authbox.badgereader_hid_keystroking.evdev.InputDevice = ( + fake_evdev_device_for_testing.InputDevice + ) + + with tempfile.NamedTemporaryFile() as f: + f.write(SAMPLE_CONFIG_DELAYED) + f.flush() + config = authbox.config.Config(f.name) + + self.dispatcher = two_button.Dispatcher(config) + for t in self.dispatcher.threads: + if t.__class__.__name__ == "Timer": + t.start() + + def _process_events(self): + while not self.dispatcher.event_queue.empty(): + item = self.dispatcher.event_queue.get_nowait() + if item is authbox.api.SHUTDOWN_SENTINEL: + break + func, args = item[0], item[1:] + func(*args) + + def is_relay_on(self, index_or_name_or_obj): + if isinstance(index_or_name_or_obj, int): + obj = self.dispatcher.outputs[index_or_name_or_obj][0] + elif isinstance(index_or_name_or_obj, str): + obj = getattr(self.dispatcher, index_or_name_or_obj) + else: + obj = index_or_name_or_obj + + if hasattr(obj, "gpio_relay"): + return obj.gpio_relay.value + elif hasattr(obj, "objs"): + return [r.gpio_relay.value for r in obj.objs] + else: + # It might be a mock object or something else + return obj.is_on if hasattr(obj, "is_on") else False # Fallback + + def test_delayed_off(self): + # Out of the box, relay should be off + self.assertFalse(self.dispatcher.authorized) + + self.assertFalse(self.is_relay_on(0)) + self.assertFalse(self.is_relay_on(1)) + + # Badge scan sets authorized flag + self.dispatcher.badge_scan("1234") + self.assertTrue(self.dispatcher.authorized) + + # "On" button pressed + self.dispatcher.on_button_down(None) + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # "Off" button pressed + self.dispatcher.abort(None) + # The dispatcher state should be not authorized + self.assertFalse(self.dispatcher.authorized) + # Main output (0) should be off immediately + self.assertFalse(self.is_relay_on(0)) + # Delayed output (1) should be ON STILL + self.assertTrue(self.is_relay_on(1)) + + # Wait for delay (1s) + buffer + time.sleep(1.5) + self._process_events() + + # Delayed output should be OFF + self.assertFalse(self.is_relay_on(1)) + + def test_cancel_delayed_off(self): + # Badge scan and turn on + self.dispatcher.badge_scan("1234") + self.dispatcher.on_button_down(None) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Abort + self.dispatcher.abort(None) + self.assertFalse(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Wait 0.5s, then turn back on + time.sleep(0.5) + self.dispatcher.on_button_down(None) # Resume! + self.assertTrue(self.dispatcher.authorized) + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) + + # Wait for the original delay (1s) to show it was cancelled + time.sleep(1.5) + self._process_events() + + # Should STILL be on! + self.assertTrue(self.is_relay_on(0)) + self.assertTrue(self.is_relay_on(1)) diff --git a/software/two_button.ini b/software/two_button.ini index 0edc4f1..42d121d 100644 --- a/software/two_button.ini +++ b/software/two_button.ini @@ -37,6 +37,8 @@ buzzer = Buzzer:35 badge_reader = HIDKeystrokingReader:HID OMNIKEY 5427 CK # For Authboard v0.4 29=J13 (small relay for interlock), 31=J12 (small relay for bofa) enable_output = Relay:ActiveHigh:29, Relay:ActiveHigh:31 +# During the specified duration, the outputs will remain on after abort +output_off_delay_seconds = [ Relay:ActiveHigh:29 = 0, Relay:ActiveHigh:31 = 60 ] [auth] # EDIT ME! diff --git a/software/two_button.py b/software/two_button.py index 5383df1..3e8d285 100644 --- a/software/two_button.py +++ b/software/two_button.py @@ -19,131 +19,258 @@ """ from __future__ import print_function -import atexit import os -import sys -import subprocess import shlex +import subprocess +import sys -from authbox.api import BaseDispatcher +from authbox.api import BaseDispatcher, MultiProxy, split_escaped from authbox.config import Config from authbox.timer import Timer -DEVNULL = open('/dev/null', 'r+') +DEVNULL = open("/dev/null", "r+") + class Dispatcher(BaseDispatcher): - def __init__(self, config): - super(Dispatcher, self).__init__(config) - - self.authorized = False - self.load_config_object('on_button', on_down=self.on_button_down) - self.load_config_object('off_button', on_down=self.abort) - self.load_config_object('badge_reader', on_scan=self.badge_scan) - self.load_config_object('enable_output') - self.load_config_object('buzzer') - self.warning_timer = Timer(self.event_queue, 'warning_timer', self.warning) - self.expire_timer = Timer(self.event_queue, 'expire_timer', self.abort) - self.expecting_press_timer = Timer(self.event_queue, 'expecting_press_timer', self.abort) - # Otherwise, start them manually! - self.threads.extend([self.warning_timer, self.expire_timer, self.expecting_press_timer]) - - self.noise = None - - def _get_command_line(self, section, key, format_args): - """Constructs a command line, safely. - - The value can contain {key}, {}, and {5} style interpolation: - - {key} will be resolved in the config.get; those are considered safe and - spaces will separate args. - - {} works on each arg independently (probably not what you want). - - {5} works fine. - """ - value = self.config.get(section, key) - pieces = shlex.split(value) - return [p.format(*format_args) for p in pieces] - - def badge_scan(self, badge_id): - # Malicious badge "numbers" that contain spaces require this extra work. - command = self._get_command_line('auth', 'command', [badge_id]) - # TODO timeout - # TODO test with missing command - rc = subprocess.call(command) - if rc == 0: - self.buzzer.beep() - self.authorized = True - self.badge_id = badge_id - self.expecting_press_timer.set(30) - self.on_button.blink() - else: - self.off_button.blink(1) - self.buzzer.beep() - if self.noise: - self.noise.kill() - if self.config.get('sounds', 'enable') == '1': - sound_command = self._get_command_line('sounds', 'command', [self.config.get('sounds', 'sad_filename')]) - self.noise = subprocess.Popen(sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL) - - def on_button_down(self, source): - print("Button down", source) - if not self.authorized: - self.off_button.blink(1) - self.buzzer.beep() - if self.noise: - self.noise.kill() - if self.config.get('sounds', 'enable') == '1': - sound_command = self._get_command_line('sounds', 'command', [self.config.get('sounds', 'sad_filename')]) - self.noise = subprocess.Popen(sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL) - return - self.expecting_press_timer.cancel() - self.on_button.on() - self.enable_output.on() - self.buzzer.off() - self.warning_timer.cancel() - self.expire_timer.cancel() - # TODO use extend time if we were already enabled, and run its command for - # logging. - # N.b. Duration (or extend) includes the warning time. - self.warning_timer.set(self.config.get_int_seconds('auth', 'duration', '5m') - - self.config.get_int_seconds('auth', 'warning', '10s')) - self.expire_timer.set(self.config.get_int_seconds('auth', 'duration', '5m')) - if self.noise: - self.noise.kill() - self.noise = None - - def abort(self, source): - print("Abort", source) - self.enable_output.off() - if self.authorized: - command = self._get_command_line('auth', 'deauth_command', [self.badge_id]) - subprocess.call(command) - self.off_button.blink(1) - self.buzzer.beep() - self.authorized = False - self.warning_timer.cancel() - self.expecting_press_timer.cancel() - self.expire_timer.cancel() - self.on_button.off() - self.buzzer.off() - if self.noise: - self.noise.kill() - self.noise = None - - def warning(self, unused_source): - self.buzzer.beepbeep() - if self.config.get('sounds', 'enable') == '1': - sound_command = self._get_command_line('sounds', 'command', [self.config.get('sounds', 'warning_filename')]) - self.noise = subprocess.Popen(shlex.split(sound_command), stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL) - self.on_button.blink() + def __init__(self, config): + super(Dispatcher, self).__init__(config) + + self.authorized = False + self.load_config_object("on_button", on_down=self.on_button_down) + self.load_config_object("off_button", on_down=self.abort) + self.load_config_object("badge_reader", on_scan=self.badge_scan) + self.load_config_object("enable_output") + self.load_config_object("buzzer") + + # Custom loading for enable_output to support delay + enable_outputs_config = list( + split_escaped(self.config.get("pins", "enable_output"), preserve=True) + ) + + self.delay_map = self._load_delay_map() + + self.outputs = [] + if isinstance(self.enable_output, MultiProxy): + objs = self.enable_output.objs + else: + objs = [self.enable_output] + + for i, obj in enumerate(objs): + if i < len(enable_outputs_config): + pin_str = enable_outputs_config[i].strip() + delay = self.delay_map.get(pin_str, 0) + self.outputs.append((obj, delay)) + else: + self.outputs.append((obj, 0)) + + self.warning_timer = Timer(self.event_queue, "warning_timer", self.warning) + self.expire_timer = Timer(self.event_queue, "expire_timer", self.abort) + self.expecting_press_timer = Timer( + self.event_queue, "expecting_press_timer", self.abort + ) + + self.timers = {} + self.threads.extend( + [self.warning_timer, self.expire_timer, self.expecting_press_timer] + ) + + for obj, delay in self.outputs: + if delay > 0: + timer_name = f"off_timer_{id(obj)}" + # Lambda captures obj correctly if we use a default arg. + timer = Timer( + self.event_queue, + timer_name, + lambda source, o=obj: self.delayed_off_generic(o), + ) + self.timers[id(obj)] = (timer, delay) + self.threads.append(timer) + + self.noise = None + self.delayed_off_running = False + self.running_timers_count = 0 + + def _get_command_line(self, section, key, format_args): + """Constructs a command line, safely. + + The value can contain {key}, {}, and {5} style interpolation: + - {key} will be resolved in the config.get; those are considered safe and + spaces will separate args. + - {} works on each arg independently (probably not what you want). + - {5} works fine. + """ + value = self.config.get(section, key) + pieces = shlex.split(value) + return [p.format(*format_args) for p in pieces] + + def _load_delay_map(self): + delay_map = {} + try: + delays_str = self.config.get("pins", "output_off_delay_seconds") + delay_map = self._parse_delay_str(delays_str) + except Exception: + pass + + if not delay_map: + try: + delays_str = self.config.get("auth", "output_off_delay_seconds") + delay_map = self._parse_delay_str(delays_str) + except Exception: + pass + + return delay_map + + def _parse_delay_str(self, delays_str): + delay_map = {} + if not delays_str: + return delay_map + delays_str = delays_str.replace("[", "").replace("]", "").strip() + pairs = [p.strip() for p in delays_str.split(",")] + for pair in pairs: + if "=" in pair: + k, v = pair.split("=") + k = k.strip() + v = v.strip() + try: + # Use Config.parse_time to support suffixes + from authbox.config import Config + + delay_map[k] = Config.parse_time(v) + except Exception as e: + print("Error parsing delay for", k, v, e) + return delay_map + + def badge_scan(self, badge_id): + # Malicious badge "numbers" that contain spaces require this extra work. + command = self._get_command_line("auth", "command", [badge_id]) + # TODO timeout + # TODO test with missing command + rc = subprocess.call(command) + if rc == 0: + self.buzzer.beep() + self.authorized = True + self.badge_id = badge_id + self.expecting_press_timer.set(30) + self.on_button.blink() + else: + self.off_button.blink(1) + self.buzzer.beep() + if self.noise: + self.noise.kill() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "sad_filename")] + ) + self.noise = subprocess.Popen( + sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL + ) + + def on_button_down(self, source): + print("Button down", source) + if not self.authorized: + if self.delayed_off_running: + self.authorized = True + self.delayed_off_running = False + self.running_timers_count = 0 + for timer, _ in self.timers.values(): + timer.cancel() + else: + self.off_button.blink(1) + self.buzzer.beep() + if self.noise: + self.noise.kill() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "sad_filename")] + ) + self.noise = subprocess.Popen( + sound_command, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL + ) + return + self.expecting_press_timer.cancel() + self.on_button.on() + self.enable_output.on() + for timer, _ in self.timers.values(): + timer.cancel() + self.buzzer.off() + self.warning_timer.cancel() + self.expire_timer.cancel() + # TODO use extend time if we were already enabled, and run its command for + # logging. + # N.b. Duration (or extend) includes the warning time. + self.warning_timer.set( + self.config.get_int_seconds("auth", "duration", "5m") + - self.config.get_int_seconds("auth", "warning", "10s") + ) + self.expire_timer.set(self.config.get_int_seconds("auth", "duration", "5m")) + if self.noise: + self.noise.kill() + self.noise = None + + def abort(self, source): + print("Abort", source) + delayed_count = 0 + for obj, delay in self.outputs: + if delay == 0: + obj.off() + else: + timer, _ = self.timers.get(id(obj), (None, None)) + if timer: + timer.cancel() + timer.set(delay) + delayed_count += 1 + + if delayed_count > 0: + self.delayed_off_running = True + self.running_timers_count = delayed_count + + if self.authorized: + command = self._get_command_line("auth", "deauth_command", [self.badge_id]) + subprocess.call(command) + self.off_button.blink(1) + self.buzzer.beep() + self.authorized = False + self.warning_timer.cancel() + self.expecting_press_timer.cancel() + self.expire_timer.cancel() + self.on_button.off() + self.buzzer.off() + if self.noise: + self.noise.kill() + self.noise = None + + def delayed_off_generic(self, obj): + print("Delayed off generic", obj) + obj.off() + self.running_timers_count -= 1 + if self.running_timers_count <= 0: + self.delayed_off_running = False + self.running_timers_count = 0 + + def warning(self, unused_source): + self.buzzer.beepbeep() + if self.config.get("sounds", "enable") == "1": + sound_command = self._get_command_line( + "sounds", "command", [self.config.get("sounds", "warning_filename")] + ) + self.noise = subprocess.Popen( + shlex.split(sound_command), + stdin=DEVNULL, + stdout=DEVNULL, + stderr=DEVNULL, + ) + self.on_button.blink() def main(args): - if not args: - root = '~' - else: - root = args[0] + if not args: + root = "~" + else: + root = args[0] + + config = Config(os.path.join(root, ".authboxrc")) + Dispatcher(config).run_loop() - config = Config(os.path.join(root, '.authboxrc')) - Dispatcher(config).run_loop() -if __name__ == '__main__': - main(sys.argv[1:]) +if __name__ == "__main__": + main(sys.argv[1:])