From f3341afab307727800cb0311bfebf7c3934ce113 Mon Sep 17 00:00:00 2001 From: rbroderi Date: Sun, 17 Apr 2022 10:59:45 -0400 Subject: [PATCH 1/6] Update __init__.py add _clean_up() method and call from stop_listening as cleanup code was not running after #listen section. --- src/sshkeyboard/__init__.py | 31 ++++++--- src/sshkeyboard/__init__.pyi | 69 ++++++++++++++++++++ src/sshkeyboard/_asyncio_run_backport_36.pyi | 15 +++++ 3 files changed, 106 insertions(+), 9 deletions(-) create mode 100644 src/sshkeyboard/__init__.pyi create mode 100644 src/sshkeyboard/_asyncio_run_backport_36.pyi diff --git a/src/sshkeyboard/__init__.py b/src/sshkeyboard/__init__.py index bfb7739..ccc1139 100644 --- a/src/sshkeyboard/__init__.py +++ b/src/sshkeyboard/__init__.py @@ -235,6 +235,9 @@ def release(key): asyncio.run(coro) +_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None + + async def listen_keyboard_manual( on_press: Optional[Callable[[str], Any]] = None, on_release: Optional[Callable[[str], Any]] = None, @@ -272,6 +275,7 @@ async def listen_keyboard_manual( global _running global _should_run + global _executor # Check the system assert sys.version_info >= (3, 6), ( "sshkeyboard requires Python version 3.6+, you have " @@ -309,20 +313,20 @@ async def listen_keyboard_manual( _should_run = True # Create thread pool executor only if it will get used - executor = None + # executor = None if not sequential and ( not asyncio.iscoroutinefunction(on_press) or not asyncio.iscoroutinefunction(on_release) ): - executor = concurrent.futures.ThreadPoolExecutor( + _executor = concurrent.futures.ThreadPoolExecutor( max_workers=max_thread_pool_workers ) # Package parameters into namespaces so they are easier to pass around # Options do not change options = SimpleNamespace( - on_press_callback=_callback(on_press, sequential, executor), - on_release_callback=_callback(on_release, sequential, executor), + on_press_callback=_callback(on_press, sequential, _executor), + on_release_callback=_callback(on_release, sequential, _executor), until=until, sequential=sequential, delay_second_char=delay_second_char, @@ -345,9 +349,16 @@ async def listen_keyboard_manual( state = await _react_to_input(state, options) await asyncio.sleep(sleep) + _clean_up() + + +def _clean_up(): + global _running + global _should_run + global _executor # Cleanup - if executor is not None: - executor.shutdown() + if _executor is not None: + _executor.shutdown() _running = False _should_run = False @@ -374,6 +385,7 @@ def press(key): if _running: global _should_run _should_run = False + _clean_up() def _is_python_36(): @@ -447,7 +459,7 @@ async def _cb(key): # http://ballingt.com/_nonblocking-stdin-in-python-3/ @contextmanager def _raw(stream): - # Not required on Windows + # Not required on windows if _is_windows: yield return @@ -462,7 +474,7 @@ def _raw(stream): @contextmanager def _nonblocking(stream): - # Not required on Windows + # Not required on windows if _is_windows: yield return @@ -612,7 +624,8 @@ async def press(key): def release(key): print(f"'{key}' released") - print("listening keyboard, press keys, and press 'esc' to exit") + # Sync version + print("listening_keyboard(), press keys, and press 'esc' to exit") listen_keyboard(on_press=press, on_release=release) # ^this is the same as # asyncio.run(listen_keyboard_manual(press, release)) diff --git a/src/sshkeyboard/__init__.pyi b/src/sshkeyboard/__init__.pyi new file mode 100644 index 0000000..1d295b6 --- /dev/null +++ b/src/sshkeyboard/__init__.pyi @@ -0,0 +1,69 @@ +""" +This type stub file was generated by pyright. +""" + +# import asyncio +# import concurrent.futures +# import fcntl +# import msvcrt +# import os +# import sys +# import termios +# import traceback +# import tty +# from contextlib import contextmanager +# from inspect import signature +# from platform import system +# from time import time +# from types import SimpleNamespace +from typing import Any, Callable, Optional, TypeAlias + +"""sshkeyboard""" +__version__: str = ... +_is_windows: bool = ... +_running: bool = ... +_should_run: bool = ... +_UNIX_ANSI_CHAR_TO_READABLE: dict[str, str] = ... +_WIN_CHAR_TO_READABLE: dict[str, str] = ... +_CHAR_TO_READABLE: dict[str, str] = ... +_WIN_SPECIAL_CHAR_STARTS: set[str] = ... +_WIN_REQUIRES_TWO_READS_STARTS: set[str] = ... +Key: TypeAlias = str + +def listen_keyboard( + on_press: Optional[Callable[[str], Any]] = ..., + on_release: Optional[Callable[[str], Any]] = ..., + until: Optional[str] = ..., + sequential: bool = ..., + delay_second_char: float = ..., + delay_other_chars: float = ..., + lower: bool = ..., + debug: bool = ..., + max_thread_pool_workers: Optional[int] = ..., + sleep: float = ..., +) -> None: + """Listen for keyboard events and f""" + ... + +async def listen_keyboard_manual( + on_press: Optional[Callable[[str], Any]] = ..., + on_release: Optional[Callable[[str], Any]] = ..., + until: Optional[str] = ..., + sequential: bool = ..., + delay_second_char: float = ..., + delay_other_chars: float = ..., + lower: bool = ..., + debug: bool = ..., + max_thread_pool_workers: Optional[int] = ..., + sleep: float = ..., +) -> None: + """The same as :func:`~sshkeyboard.""" + ... + +def stop_listening() -> None: + """Stops the ongoing keyboard listener.""" + ... + +if __name__ == "__main__": + async def press(key): ... + def release(key): ... diff --git a/src/sshkeyboard/_asyncio_run_backport_36.pyi b/src/sshkeyboard/_asyncio_run_backport_36.pyi new file mode 100644 index 0000000..6ffaf59 --- /dev/null +++ b/src/sshkeyboard/_asyncio_run_backport_36.pyi @@ -0,0 +1,15 @@ +""" +This type stub file was generated by pyright. +""" + +from typing import Any, Awaitable, Coroutine, TypeVar, Union + +""" +Backport of the asyncio.runners""" +__all__ = ("run36", ) +_T = TypeVar("_T") +def run36(main: Union[Coroutine[Any, None, _T], Awaitable[_T]], *, debug: bool = ...) -> _T: + """Run a coroutine. + This functi""" + ... + From 123431b6926fb81f609ec8e2da21966f4ad67dfa Mon Sep 17 00:00:00 2001 From: rbroderi Date: Sun, 17 Apr 2022 19:17:00 -0400 Subject: [PATCH 2/6] moved to class, cleanup, typehint, refactor --- src/sshkeyboard/__init__.py | 630 +------------------------------- src/sshkeyboard/__init__.pyi | 69 ---- src/sshkeyboard/key_listener.py | 557 ++++++++++++++++++++++++++++ 3 files changed, 559 insertions(+), 697 deletions(-) delete mode 100644 src/sshkeyboard/__init__.pyi create mode 100644 src/sshkeyboard/key_listener.py diff --git a/src/sshkeyboard/__init__.py b/src/sshkeyboard/__init__.py index ccc1139..857ad8a 100644 --- a/src/sshkeyboard/__init__.py +++ b/src/sshkeyboard/__init__.py @@ -1,631 +1,5 @@ """sshkeyboard""" -__version__ = "2.3.1" +__version__ = "3.0.0" -import asyncio -import concurrent.futures -import os -import sys -import traceback -from contextlib import contextmanager -from inspect import signature -from platform import system -from time import time -from types import SimpleNamespace -from typing import Any, Callable, Optional - -try: - from ._asyncio_run_backport_36 import run36 -except ImportError: # this allows local testing: python __init__.py - from _asyncio_run_backport_36 import run36 - -_is_windows = system().lower() == "windows" - -if _is_windows: - import msvcrt -else: - import fcntl - import termios - import tty - - -# Global state - -# Makes sure only listener can be started at a time -_running = False -# Makes sure listener stops if error has been raised -# inside thread pool executor or asyncio task or -# stop_listening() has been called -_should_run = False - -# Readable representations for selected ansi characters -# All possible ansi characters here: -# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/master/prompt_toolkit/input/ansi_escape_sequences.py -# Listener does not support modifier keys for now -_UNIX_ANSI_CHAR_TO_READABLE = { - # 'Regular' characters - "\x1b": "esc", - "\x7f": "backspace", - "\x1b[2~": "insert", - "\x1b[3~": "delete", - "\x1b[5~": "pageup", - "\x1b[6~": "pagedown", - "\x1b[H": "home", - "\x1b[F": "end", - "\x1b[A": "up", - "\x1b[B": "down", - "\x1b[C": "right", - "\x1b[D": "left", - "\x1bOP": "f1", - "\x1bOQ": "f2", - "\x1bOR": "f3", - "\x1bOS": "f4", - "\x1b[15~": "f5", - "\x1b[17~": "f6", - "\x1b[18~": "f7", - "\x1b[19~": "f8", - "\x1b[20~": "f9", - "\x1b[21~": "f10", - "\x1b[23~": "f11", - "\x1b[24~": "f12", - "\x1b[25~": "f13", - "\x1b[26~": "f14", - "\x1b[28~": "f15", - "\x1b[29~": "f16", - "\x1b[31~": "f17", - "\x1b[32~": "f18", - "\x1b[33~": "f19", - "\x1b[34~": "f20", - # Special/duplicate: - # Tmux, Emacs - "\x1bOH": "home", - "\x1bOF": "end", - "\x1bOA": "up", - "\x1bOB": "down", - "\x1bOC": "right", - "\x1bOD": "left", - # Rrvt - "\x1b[1~": "home", - "\x1b[4~": "end", - "\x1b[11~": "f1", - "\x1b[12~": "f2", - "\x1b[13~": "f3", - "\x1b[14~": "f4", - # Linux console - "\x1b[[A": "f1", - "\x1b[[B": "f2", - "\x1b[[C": "f3", - "\x1b[[D": "f4", - "\x1b[[E": "f5", - # Xterm - "\x1b[1;2P": "f13", - "\x1b[1;2Q": "f14", - "\x1b[1;2S": "f16", - "\x1b[15;2~": "f17", - "\x1b[17;2~": "f18", - "\x1b[18;2~": "f19", - "\x1b[19;2~": "f20", - "\x1b[20;2~": "f21", - "\x1b[21;2~": "f22", - "\x1b[23;2~": "f23", - "\x1b[24;2~": "f24", -} - -_WIN_CHAR_TO_READABLE = { - "\x1b": "esc", - "\x08": "backspace", - "àR": "insert", - "àS": "delete", - "àI": "pageup", - "àQ": "pagedown", - "àG": "home", - "àO": "end", - "àH": "up", - "àP": "down", - "àM": "right", - "àK": "left", - "\x00;": "f1", - "\x00<": "f2", - "\x00=": "f3", - "\x00>": "f4", - "\x00?": "f5", - "\x00@": "f6", - "\x00A": "f7", - "\x00B": "f8", - "\x00C": "f9", - "\x00D": "f10", - # "": "f11", ? - "à†": "f12", -} - -# Some non-ansi characters that need a readable representation -_CHAR_TO_READABLE = { - "\t": "tab", - "\n": "enter", - "\r": "enter", - " ": "space", -} - -_WIN_SPECIAL_CHAR_STARTS = {"\x1b", "\x08", "\x00", "\xe0"} -_WIN_REQUIRES_TWO_READS_STARTS = {"\x00", "\xe0"} - - -def listen_keyboard( - on_press: Optional[Callable[[str], Any]] = None, - on_release: Optional[Callable[[str], Any]] = None, - until: Optional[str] = "esc", - sequential: bool = False, - delay_second_char: float = 0.75, - delay_other_chars: float = 0.05, - lower: bool = True, - debug: bool = False, - max_thread_pool_workers: Optional[int] = None, - sleep: float = 0.01, -) -> None: - - """Listen for keyboard events and fire `on_press` and `on_release` callback - functions - - Supports asynchronous callbacks also. - - Blocks the thread until the key in `until` parameter has been pressed, an - error has been raised or :func:`~sshkeyboard.stop_listening` has been - called. - - Simple example with asynchronous and regular callbacks: - - .. code-block:: python - - from sshkeyboard import listen_keyboard - - async def press(key): - print(f"'{key}' pressed") - - def release(key): - print(f"'{key}' released") - - listen_keyboard( - on_press=press, - on_release=release, - ) - - Args: - on_press: Function that gets called when a key is pressed. The - function takes the pressed key as parameter. Defaults to None. - on_release: Function that gets called when a key is released. The - function takes the released key as parameter. Defaults to None. - until: A key that will end keyboard listening. None means that - listening will stop only when :func:`~sshkeyboard.stop_listening` - has been called or an error has been raised. Defaults to "esc". - sequential: If enabled, callbacks will be forced to happen one by - one instead of concurrently or asynchronously. Defaults to False. - delay_second_char: The timeout between first and second character when - holding down a key. Depends on terminal and is used for parsing - the input. Defaults to 0.75. - delay_other_chars: The timeout between all other characters when - holding down a key. Depends on terminal and is used for parsing - the input. Defaults to 0.05. - lower: If enabled, the callback 'key' parameter gets turned into lower - case key even if it was upper case, for example "A" -> "a". - Defaults to True. - debug: Print debug messages. Defaults to False. - max_thread_pool_workers: Define the number of workers in - ThreadPoolExecutor, None means that a default value will get used. - Will get ignored if sequential=True. Defaults to None. - sleep: asyncio.sleep() amount between attempted keyboard input reads. - Defaults to 0.01. - """ - - coro = listen_keyboard_manual( - on_press, - on_release, - until, - sequential, - delay_second_char, - delay_other_chars, - lower, - debug, - max_thread_pool_workers, - sleep, - ) - - if _is_python_36(): - run36(coro) - else: - asyncio.run(coro) - - -_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None - - -async def listen_keyboard_manual( - on_press: Optional[Callable[[str], Any]] = None, - on_release: Optional[Callable[[str], Any]] = None, - until: Optional[str] = "esc", - sequential: bool = False, - delay_second_char: float = 0.75, - delay_other_chars: float = 0.05, - lower: bool = True, - debug: bool = False, - max_thread_pool_workers: Optional[int] = None, - sleep: float = 0.01, -) -> None: - """The same as :func:`~sshkeyboard.listen_keyboard`, but now the - awaiting must be handled by the caller - - .. code-block:: python - - from sshkeyboard import listen_keyboard_manual - # ... - asyncio.run(listen_keyboard_manual(...)) - - is the same as - - .. code-block:: python - - from sshkeyboard import listen_keyboard - # ... - listen_keyboard(...) - - (Python version 3.6 which does not have `asyncio.run` is handled - differently internally) - - Has the same parameters as :func:`~sshkeyboard.listen_keyboard` - """ - - global _running - global _should_run - global _executor - # Check the system - assert sys.version_info >= (3, 6), ( - "sshkeyboard requires Python version 3.6+, you have " - f"{sys.version_info.major}.{sys.version_info.minor}" - ) - # Check the state - assert not _running, "Only one listener allowed at a time" - assert ( - not _should_run - ), "Should have ended listening properly the last time" - # Check the parameters - assert ( - on_press is not None or on_release is not None - ), "Either on_press or on_release should be defined" - _check_callback_ok(on_press, "on_press") - _check_callback_ok(on_release, "on_release") - assert until is None or isinstance( - until, str - ), "'until' has to be a string or None" - assert isinstance(sequential, bool), "'sequential' has to be boolean" - assert isinstance( - delay_second_char, (int, float) - ), "'delay_second_char' has to be numeric" - assert isinstance( - delay_other_chars, (int, float) - ), "'delay_other_chars' has to be numeric" - assert isinstance(lower, bool), "'lower' has to be boolean" - assert isinstance(debug, bool), "'debug' has to be boolean" - assert max_thread_pool_workers is None or isinstance( - max_thread_pool_workers, int - ), "'max_thread_pool_workers' has to be None or int" - assert isinstance(sleep, (int, float)), "'sleep' has to numeric" - - _running = True - _should_run = True - - # Create thread pool executor only if it will get used - # executor = None - if not sequential and ( - not asyncio.iscoroutinefunction(on_press) - or not asyncio.iscoroutinefunction(on_release) - ): - _executor = concurrent.futures.ThreadPoolExecutor( - max_workers=max_thread_pool_workers - ) - - # Package parameters into namespaces so they are easier to pass around - # Options do not change - options = SimpleNamespace( - on_press_callback=_callback(on_press, sequential, _executor), - on_release_callback=_callback(on_release, sequential, _executor), - until=until, - sequential=sequential, - delay_second_char=delay_second_char, - delay_other_chars=delay_other_chars, - lower=lower, - debug=debug, - sleep=sleep, - ) - # State does change - state = SimpleNamespace( - press_time=time(), - initial_press_time=time(), - previous="", - current="", - ) - - # Listen - with _raw(sys.stdin), _nonblocking(sys.stdin): - while _should_run: - state = await _react_to_input(state, options) - await asyncio.sleep(sleep) - - _clean_up() - - -def _clean_up(): - global _running - global _should_run - global _executor - # Cleanup - if _executor is not None: - _executor.shutdown() - _running = False - _should_run = False - - -def stop_listening() -> None: - """Stops the ongoing keyboard listeners - - Can be called inside the callbacks or from outside. Does not do anything - if listener is not running. - - Example to stop after some condition is met, ("z" pressed in this case): - - .. code-block:: python - - from sshkeyboard import listen_keyboard, stop_listening - - def press(key): - print(f"'{key}' pressed") - if key == "z": - stop_listening() - - listen_keyboard(on_press=press) - """ - if _running: - global _should_run - _should_run = False - _clean_up() - - -def _is_python_36(): - return sys.version_info.major == 3 and sys.version_info.minor == 6 - - -def _check_callback_ok(function, name): - if function is not None: - assert callable(function), f"{name} must be None or callable" - assert _takes_at_least_one_param( - function - ), f"{name} must take at least one parameter" - assert _max_one_param_without_default(function), ( - f"{name} must have one or zero parameters without a default " - f"value, now takes more: {_default_empty_params(function)}" - ) - - -def _takes_at_least_one_param(function): - sig = signature(function) - return len(sig.parameters.values()) >= 1 - - -def _default_empty_params(function): - sig = signature(function) - return tuple( - param.name - for param in sig.parameters.values() - if ( - param.kind == param.POSITIONAL_OR_KEYWORD - and param.default is param.empty - ) - ) - - -def _max_one_param_without_default(function): - default_empty_params = _default_empty_params(function) - return len(default_empty_params) <= 1 - - -def _done(task): - if not task.cancelled() and task.exception() is not None: - ex = task.exception() - traceback.print_exception(type(ex), ex, ex.__traceback__) - global _should_run - _should_run = False - - -def _callback(cb_function, sequential, executor): - async def _cb(key): - if cb_function is None: - return - - if sequential: - if asyncio.iscoroutinefunction(cb_function): - await cb_function(key) - else: - cb_function(key) - else: - if asyncio.iscoroutinefunction(cb_function): - task = asyncio.create_task(cb_function(key)) - task.add_done_callback(_done) - else: - future = executor.submit(cb_function, key) - future.add_done_callback(_done) - - return _cb - - -# Raw and _nonblocking inspiration from: -# http://ballingt.com/_nonblocking-stdin-in-python-3/ -@contextmanager -def _raw(stream): - # Not required on windows - if _is_windows: - yield - return - - original_stty = termios.tcgetattr(stream) - try: - tty.setcbreak(stream) - yield - finally: - termios.tcsetattr(stream, termios.TCSANOW, original_stty) - - -@contextmanager -def _nonblocking(stream): - # Not required on windows - if _is_windows: - yield - return - - fd = stream.fileno() - orig_fl = fcntl.fcntl(fd, fcntl.F_GETFL) - try: - fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK) - yield - finally: - fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl) - - -def _read_char(debug): - if _is_windows: - return _read_char_win(debug) - else: - return _read_char_unix(debug) - - -def _read_char_win(debug): - # Return if nothing to read - if not msvcrt.kbhit(): - return "" - - char = msvcrt.getwch() - if char in _WIN_SPECIAL_CHAR_STARTS: - # Check if requires one more read - if char in _WIN_REQUIRES_TWO_READS_STARTS: - char += msvcrt.getwch() - - if char in _WIN_CHAR_TO_READABLE: - return _WIN_CHAR_TO_READABLE[char] - else: - if debug: - print(f"Non-supported win char: {repr(char)}") - return None - - # Change some character representations to readable strings - elif char in _CHAR_TO_READABLE: - char = _CHAR_TO_READABLE[char] - - return char - - -def _read_char_unix(debug): - char = _read_unix_stdin(1) - - # Skip and continue if read failed - if char is None: - return None - - # Handle any character - elif char != "": - # Read more if ansi character, skip and continue if unknown - if _is_unix_ansi(char): - char, raw = _read_and_parse_unix_ansi(char) - if char is None: - if debug: - print(f"Non-supported ansi char: {repr(raw)}") - return None - # Change some character representations to readable strings - elif char in _CHAR_TO_READABLE: - char = _CHAR_TO_READABLE[char] - - return char - - -def _read_unix_stdin(amount): - try: - return sys.stdin.read(amount) - except IOError: - return None - - -# '\x' at the start is a good indicator for ansi character -def _is_unix_ansi(char): - rep = repr(char) - return len(rep) >= 2 and rep[1] == "\\" and rep[2] == "x" - - -def _read_and_parse_unix_ansi(char): - char += _read_unix_stdin(5) - if char in _UNIX_ANSI_CHAR_TO_READABLE: - return _UNIX_ANSI_CHAR_TO_READABLE[char], char - else: - return None, char - - -async def _react_to_input(state, options): - # Read next character - state.current = _read_char(options.debug) - - # Skip and continue if read failed - if state.current is None: - return state - - # Handle any character - elif state.current != "": - - # Make lower case if requested - if options.lower: - state.current = state.current.lower() - - # Stop if until character has been read - if options.until is not None and state.current == options.until: - stop_listening() - return state - - # Release state.previous if new pressed - if state.previous != "" and state.current != state.previous: - await options.on_release_callback(state.previous) - # Weirdly on_release fires too late on Windows unless there is - # an extra sleep here when sequential=False... - if _is_windows and not options.sequential: - await asyncio.sleep(options.sleep) - - # Press if new character, update state.previous - if state.current != state.previous: - await options.on_press_callback(state.current) - state.initial_press_time = time() - state.previous = state.current - - # Update press time - if state.current == state.previous: - state.press_time = time() - - # Handle empty - # - Release the state.previous character if nothing is read - # and enough time has passed - # - The second character comes slower than the rest on terminal - elif state.previous != "" and ( - time() - state.initial_press_time > options.delay_second_char - and time() - state.press_time > options.delay_other_chars - ): - await options.on_release_callback(state.previous) - state.previous = state.current - - return state - - -if __name__ == "__main__": - - async def press(key): - print(f"'{key}' pressed") - - def release(key): - print(f"'{key}' released") - - # Sync version - print("listening_keyboard(), press keys, and press 'esc' to exit") - listen_keyboard(on_press=press, on_release=release) - # ^this is the same as - # asyncio.run(listen_keyboard_manual(press, release)) +from .key_listener import * diff --git a/src/sshkeyboard/__init__.pyi b/src/sshkeyboard/__init__.pyi deleted file mode 100644 index 1d295b6..0000000 --- a/src/sshkeyboard/__init__.pyi +++ /dev/null @@ -1,69 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - -# import asyncio -# import concurrent.futures -# import fcntl -# import msvcrt -# import os -# import sys -# import termios -# import traceback -# import tty -# from contextlib import contextmanager -# from inspect import signature -# from platform import system -# from time import time -# from types import SimpleNamespace -from typing import Any, Callable, Optional, TypeAlias - -"""sshkeyboard""" -__version__: str = ... -_is_windows: bool = ... -_running: bool = ... -_should_run: bool = ... -_UNIX_ANSI_CHAR_TO_READABLE: dict[str, str] = ... -_WIN_CHAR_TO_READABLE: dict[str, str] = ... -_CHAR_TO_READABLE: dict[str, str] = ... -_WIN_SPECIAL_CHAR_STARTS: set[str] = ... -_WIN_REQUIRES_TWO_READS_STARTS: set[str] = ... -Key: TypeAlias = str - -def listen_keyboard( - on_press: Optional[Callable[[str], Any]] = ..., - on_release: Optional[Callable[[str], Any]] = ..., - until: Optional[str] = ..., - sequential: bool = ..., - delay_second_char: float = ..., - delay_other_chars: float = ..., - lower: bool = ..., - debug: bool = ..., - max_thread_pool_workers: Optional[int] = ..., - sleep: float = ..., -) -> None: - """Listen for keyboard events and f""" - ... - -async def listen_keyboard_manual( - on_press: Optional[Callable[[str], Any]] = ..., - on_release: Optional[Callable[[str], Any]] = ..., - until: Optional[str] = ..., - sequential: bool = ..., - delay_second_char: float = ..., - delay_other_chars: float = ..., - lower: bool = ..., - debug: bool = ..., - max_thread_pool_workers: Optional[int] = ..., - sleep: float = ..., -) -> None: - """The same as :func:`~sshkeyboard.""" - ... - -def stop_listening() -> None: - """Stops the ongoing keyboard listener.""" - ... - -if __name__ == "__main__": - async def press(key): ... - def release(key): ... diff --git a/src/sshkeyboard/key_listener.py b/src/sshkeyboard/key_listener.py new file mode 100644 index 0000000..347a432 --- /dev/null +++ b/src/sshkeyboard/key_listener.py @@ -0,0 +1,557 @@ +"""Key handler module.""" + + +# Readable representations for selected ansi characters +# All possible ansi characters here: +# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/master/prompt_toolkit/input/ansi_escape_sequences.py +# Listener does not support modifier keys for now +import asyncio +import concurrent.futures +import os +import sys +from contextlib import contextmanager +from dataclasses import InitVar, dataclass +from platform import system +from time import time +from typing import ( + IO, + Any, + Awaitable, + Callable, + Coroutine, + Dict, + List, + Optional, + TypeAlias, + Union, +) + +try: + from ._asyncio_run_backport_36 import run36 +except ImportError: # this allows local testing: python __init__.py + from _asyncio_run_backport_36 import run36 # type:ignore + +_is_windows = system().lower() == "windows" + +if _is_windows: + import msvcrt +else: + import fcntl + import termios + import tty + +Key: TypeAlias = str +CallbackFunction: TypeAlias = Callable[[Key], Any] + +_UNIX_ANSI_CHAR_TO_READABLE = { + # 'Regular' characters + "\x1b": "esc", + "\x7f": "backspace", + "\x1b[2~": "insert", + "\x1b[3~": "delete", + "\x1b[5~": "pageup", + "\x1b[6~": "pagedown", + "\x1b[H": "home", + "\x1b[F": "end", + "\x1b[A": "up", + "\x1b[B": "down", + "\x1b[C": "right", + "\x1b[D": "left", + "\x1bOP": "f1", + "\x1bOQ": "f2", + "\x1bOR": "f3", + "\x1bOS": "f4", + "\x1b[15~": "f5", + "\x1b[17~": "f6", + "\x1b[18~": "f7", + "\x1b[19~": "f8", + "\x1b[20~": "f9", + "\x1b[21~": "f10", + "\x1b[23~": "f11", + "\x1b[24~": "f12", + "\x1b[25~": "f13", + "\x1b[26~": "f14", + "\x1b[28~": "f15", + "\x1b[29~": "f16", + "\x1b[31~": "f17", + "\x1b[32~": "f18", + "\x1b[33~": "f19", + "\x1b[34~": "f20", + # Special/duplicate: + # Tmux, Emacs + "\x1bOH": "home", + "\x1bOF": "end", + "\x1bOA": "up", + "\x1bOB": "down", + "\x1bOC": "right", + "\x1bOD": "left", + # Rrvt + "\x1b[1~": "home", + "\x1b[4~": "end", + "\x1b[11~": "f1", + "\x1b[12~": "f2", + "\x1b[13~": "f3", + "\x1b[14~": "f4", + # Linux console + "\x1b[[A": "f1", + "\x1b[[B": "f2", + "\x1b[[C": "f3", + "\x1b[[D": "f4", + "\x1b[[E": "f5", + # Xterm + "\x1b[1;2P": "f13", + "\x1b[1;2Q": "f14", + "\x1b[1;2S": "f16", + "\x1b[15;2~": "f17", + "\x1b[17;2~": "f18", + "\x1b[18;2~": "f19", + "\x1b[19;2~": "f20", + "\x1b[20;2~": "f21", + "\x1b[21;2~": "f22", + "\x1b[23;2~": "f23", + "\x1b[24;2~": "f24", +} + +_WIN_CHAR_TO_READABLE = { + "\x1b": "esc", + "\x08": "backspace", + "àR": "insert", + "àS": "delete", + "àI": "pageup", + "àQ": "pagedown", + "àG": "home", + "àO": "end", + "àH": "up", + "àP": "down", + "àM": "right", + "àK": "left", + "\x00;": "f1", + "\x00<": "f2", + "\x00=": "f3", + "\x00>": "f4", + "\x00?": "f5", + "\x00@": "f6", + "\x00A": "f7", + "\x00B": "f8", + "\x00C": "f9", + "\x00D": "f10", + # "": "f11", ? + "à†": "f12", +} + +# Some non-ansi characters that need a readable representation +_CHAR_TO_READABLE = { + "\t": "tab", + "\n": "enter", + "\r": "enter", + " ": "space", +} + +_WIN_SPECIAL_CHAR_STARTS = {"\x1b", "\x08", "\x00", "\xe0"} +_WIN_REQUIRES_TWO_READS_STARTS = {"\x00", "\xe0"} + + +def _is_python_36(): + return sys.version_info.major == 3 and sys.version_info.minor == 6 + + +def _done( + task: Union[asyncio.Future[Any], concurrent.futures.Future[None]] +) -> None: + if not task.cancelled(): + ex = task.exception() + if ex is not None: + raise ex + # traceback.print_exception(type(ex), ex, ex.__traceback__) + + +# Raw and _nonblocking inspiration from: +# http://ballingt.com/_nonblocking-stdin-in-python-3/ +@contextmanager +def _raw(stream: IO[Any]): + # Not required on windows + if _is_windows: + yield + return + + original_stty: List[Any] = termios.tcgetattr(stream) # type: ignore + try: + tty.setcbreak(stream) # type: ignore + yield + finally: + termios.tcsetattr( # type: ignore + stream, termios.TCSANOW, original_stty # type: ignore + ) + + +@contextmanager +def _nonblocking(stream: IO[Any]): + # Not required on windows + if _is_windows: + yield + return + + fd = stream.fileno() + orig_fl = fcntl.fcntl(fd, fcntl.F_GETFL) # type: ignore + try: + fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK) # type: ignore + yield + finally: + fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl) # type: ignore + + +class MultipleListenerError(Exception): + pass + + +def _read_char(debug: bool) -> Optional[str]: + if _is_windows: + return _read_char_win(debug) + else: + return _read_char_unix(debug) + + +def _read_char_win(debug: bool) -> Optional[str]: + # Return if nothing to read + if not msvcrt.kbhit(): + return "" + + char = msvcrt.getwch() + if char in _WIN_SPECIAL_CHAR_STARTS: + # Check if requires one more read + if char in _WIN_REQUIRES_TWO_READS_STARTS: + char += msvcrt.getwch() + + if char in _WIN_CHAR_TO_READABLE: + return _WIN_CHAR_TO_READABLE[char] + else: + if debug: + print(f"Non-supported win char: {repr(char)}") + return None + + # Change some character representations to readable strings + elif char in _CHAR_TO_READABLE: + char = _CHAR_TO_READABLE[char] + + return char + + +def _read_unix_stdin(amount: int) -> str: + try: + return sys.stdin.read(amount) + except IOError: + return "" + + +# '\x' at the start is a good indicator for ansi character +def _is_unix_ansi(char: str): + rep = repr(char) + return len(rep) >= 2 and rep[1] == "\\" and rep[2] == "x" + + +def _read_and_parse_unix_ansi(char: str): + char += _read_unix_stdin(5) + if char in _UNIX_ANSI_CHAR_TO_READABLE: + return _UNIX_ANSI_CHAR_TO_READABLE[char], char + else: + return None, char + + +def _read_char_unix(debug: bool) -> Optional[str]: + raw: Any + char: Optional[str] = _read_unix_stdin(1) + + # Skip and continue if read failed + if char is None: + return None + + # Handle any character + elif char != "": + # Read more if ansi character, skip and continue if unknown + if _is_unix_ansi(char): + char, raw = _read_and_parse_unix_ansi(char) + if char is None: + if debug: + print(f"Non-supported ansi char: {repr(raw)}") + return None + # Change some character representations to readable strings + elif char in _CHAR_TO_READABLE: + char = _CHAR_TO_READABLE[char] + + return char + + +@dataclass +class KeyListener: + """Key listener class. + + Args: + on_press: Function that gets called when a key is pressed. The + function takes the pressed key as parameter. Defaults to None. + on_release: Function that gets called when a key is released. The + function takes the released key as parameter. Defaults to None. + until: A key that will end keyboard listening. None means that + listening will stop only when :func:`~sshkeyboard.stop_listening` + has been called or an error has been raised. Defaults to "esc". + sequential: If enabled, callbacks will be forced to happen one by + one instead of concurrently or asynchronously. Defaults to False. + delay_second_char: The timeout between first and second character when + holding down a key. Depends on terminal and is used for parsing + the input. Defaults to 0.75. + delay_other_chars: The timeout between all other characters when + holding down a key. Depends on terminal and is used for parsing + the input. Defaults to 0.05. + lower: If enabled, the callback 'key' parameter gets turned into lower + case key even if it was upper case, for example "A" -> "a". + Defaults to True. + debug: Print debug messages. Defaults to False. + max_thread_pool_workers: Define the number of workers in + ThreadPoolExecutor, None means that a default value will get used. + Will get ignored if sequential=True. Defaults to None. + sleep: asyncio.sleep() amount between attempted keyboard input reads. + Defaults to 0.01. + """ + + on_press_func: InitVar[Optional[CallbackFunction]] = None + on_release_func: InitVar[Optional[CallbackFunction]] = None + until: Optional[str] = "esc" + sequential: bool = False + delay_second_char: float = 0.75 + delay_other_chars: float = 0.05 + lower: bool = True + debug: bool = False + max_thread_pool_workers: Optional[int] = None + sleep: float = 0.01 + + def __post_init__( + self, + on_press_func: Optional[CallbackFunction], + on_release_func: Optional[CallbackFunction], + ) -> None: + # Check the system + if sys.version_info < (3, 6): + raise RuntimeError( + "sshkeyboard requires Python version 3.6+, you have " + f"{sys.version_info.major}.{sys.version_info.minor}" + ) + self.on_press = self._callback(on_press_func) + self.on_release = self._callback(on_release_func) + self.executor: Optional[concurrent.futures.ThreadPoolExecutor] = None + self._running = False + # Create thread pool executor only if it will get used + # executor = None + if not self.sequential and ( + not asyncio.iscoroutinefunction(self.on_press) + or not asyncio.iscoroutinefunction(self.on_release) + ): + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_thread_pool_workers + ) + + def _callback( + self, cb_function: Optional[CallbackFunction] + ) -> Callable[[str], Coroutine[Any, Any, Any]]: + async def _cb(key: Key): + if cb_function is None: + return + + if self.sequential or self.executor is None: + if asyncio.iscoroutinefunction(cb_function): + if isinstance(cb_function, Awaitable): + await cb_function(key) + else: + cb_function(key) + else: + if asyncio.iscoroutinefunction(cb_function): + task: asyncio.Task[None] = asyncio.create_task( + cb_function(key) + ) + task.add_done_callback(_done) + else: + future = self.executor.submit(cb_function, key) + future.add_done_callback(_done) + + return _cb + + @property + def running(self) -> bool: + return self._running + + def listen_keyboard( + self, + ) -> None: + + """Listen for keyboard events and fire `on_press` and `on_release` + callback functions + + Supports asynchronous callbacks also. + + Blocks the thread until the key in `until` parameter has been pressed, + an error has been raised or :func:`~sshkeyboard.stop_listening` has + been called. + + Simple example with asynchronous and regular callbacks: + + .. code-block:: python + + from sshkeyboard import listen_keyboard + + async def press(key): + print(f"'{key}' pressed") + + def release(key): + print(f"'{key}' released") + + listen_keyboard( + on_press=press, + on_release=release, + ) + """ + + coro = self.listen_keyboard_manual() + + if _is_python_36(): + run36(coro) + else: + asyncio.run(coro) + + async def listen_keyboard_manual( + self, + ) -> None: + """The same as :func:`~sshkeyboard.listen_keyboard`, but now the + awaiting must be handled by the caller + + .. code-block:: python + + from sshkeyboard import listen_keyboard_manual + + # ... + asyncio.run(listen_keyboard_manual(...)) + + is the same as + + .. code-block:: python + + from sshkeyboard import listen_keyboard + + # ... + listen_keyboard(...) + + (Python version 3.6 which does not have `asyncio.run` is handled + differently internally) + + Has the same parameters as :func:`~sshkeyboard.listen_keyboard` + """ + if self.running: + raise MultipleListenerError("Listener is already running.") + self._running = True + if self.on_press is None and self.on_release is None: + raise ValueError("Either on_press or on_release should be defined") + # Listen + with _raw(sys.stdin), _nonblocking(sys.stdin): + state: Dict[str, Any] = {} + while self.running: + state = await self._react_to_input(**state) + await asyncio.sleep(self.sleep) + + self._clean_up() + + async def _react_to_input( + self, + press_time: float = time(), + initial_press_time: float = time(), + previous: str = "", + current: Optional[str] = "", + ) -> Dict[str, Any]: + # Read next character + current = _read_char(self.debug) + + # Skip and continue if read failed + if current is None: + return { + "press_time": press_time, + "initial_press_time": initial_press_time, + "previous": previous, + "current": current, + } + + # Handle any character + elif current != "": + + # Make lower case if requested + if self.lower: + current = current.lower() + + # Stop if until character has been read + if self.until is not None and current == self.until: + self.stop_listening() + return { + "press_time": press_time, + "initial_press_time": initial_press_time, + "previous": previous, + "current": current, + } + + # Release previous if new pressed + if previous != "" and current != previous: + await self.on_release(previous) + # Weirdly on_release fires too late on Windows unless there is + # an extra sleep here when sequential=False... + if _is_windows and not self.sequential: + await asyncio.sleep(self.sleep) + + # Press if new character, update state.previous + if current != previous: + await self.on_press(current) + initial_press_time = time() + previous = current + + # Update press time + if current == previous: + press_time = time() + + # Handle empty + # - Release the state.previous character if nothing is read + # and enough time has passed + # - The second character comes slower than the rest on terminal + elif previous != "" and ( + time() - initial_press_time > self.delay_second_char + and time() - press_time > self.delay_other_chars + ): + await self.on_release(previous) + previous = current + + return { + "press_time": press_time, + "initial_press_time": initial_press_time, + "previous": previous, + "current": current, + } + + def _clean_up(self): + if self.executor is not None: + self.executor.shutdown() + self._running = False + + def stop_listening(self) -> None: + """Stops the ongoing keyboard listeners + + Can be called inside the callbacks or from outside. Does not do + anything if listener is not running. + + Example to stop after some condition is met, + ("z" pressed in this case): + + .. code-block:: python + + from sshkeyboard import listen_keyboard, stop_listening + + def press(key): + print(f"'{key}' pressed") + if key == "z": + stop_listening() + + listen_keyboard(on_press=press) + """ + if self.running: + self._clean_up() From 563ffff65c9f1579a8ffdb3cd5668753671de6df Mon Sep 17 00:00:00 2001 From: rbroderi Date: Sun, 17 Apr 2022 20:35:02 -0400 Subject: [PATCH 3/6] further cleanup and standardize --- src/sshkeyboard/__init__.py | 2 +- src/sshkeyboard/char_maps.py | 114 ++++++++++++ src/sshkeyboard/char_reader.py | 97 ++++++++++ src/sshkeyboard/context_managers.py | 48 +++++ src/sshkeyboard/errors.py | 5 + src/sshkeyboard/key_listener.py | 278 ++-------------------------- 6 files changed, 285 insertions(+), 259 deletions(-) create mode 100644 src/sshkeyboard/char_maps.py create mode 100644 src/sshkeyboard/char_reader.py create mode 100644 src/sshkeyboard/context_managers.py create mode 100644 src/sshkeyboard/errors.py diff --git a/src/sshkeyboard/__init__.py b/src/sshkeyboard/__init__.py index 857ad8a..d276f03 100644 --- a/src/sshkeyboard/__init__.py +++ b/src/sshkeyboard/__init__.py @@ -2,4 +2,4 @@ __version__ = "3.0.0" -from .key_listener import * +from .key_listener import Key, KeyListener diff --git a/src/sshkeyboard/char_maps.py b/src/sshkeyboard/char_maps.py new file mode 100644 index 0000000..989c878 --- /dev/null +++ b/src/sshkeyboard/char_maps.py @@ -0,0 +1,114 @@ +"""Holds character mapping constants.""" +from typing import Final + +# Readable representations for selected ansi characters +# All possible ansi characters here: +# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/master/prompt_toolkit/input/ansi_escape_sequences.py +# Listener does not support modifier keys for now + +UNIX_ANSI_CHAR_TO_READABLE: Final = { + # 'Regular' characters + "\x1b": "esc", + "\x7f": "backspace", + "\x1b[2~": "insert", + "\x1b[3~": "delete", + "\x1b[5~": "pageup", + "\x1b[6~": "pagedown", + "\x1b[H": "home", + "\x1b[F": "end", + "\x1b[A": "up", + "\x1b[B": "down", + "\x1b[C": "right", + "\x1b[D": "left", + "\x1bOP": "f1", + "\x1bOQ": "f2", + "\x1bOR": "f3", + "\x1bOS": "f4", + "\x1b[15~": "f5", + "\x1b[17~": "f6", + "\x1b[18~": "f7", + "\x1b[19~": "f8", + "\x1b[20~": "f9", + "\x1b[21~": "f10", + "\x1b[23~": "f11", + "\x1b[24~": "f12", + "\x1b[25~": "f13", + "\x1b[26~": "f14", + "\x1b[28~": "f15", + "\x1b[29~": "f16", + "\x1b[31~": "f17", + "\x1b[32~": "f18", + "\x1b[33~": "f19", + "\x1b[34~": "f20", + # Special/duplicate: + # Tmux, Emacs + "\x1bOH": "home", + "\x1bOF": "end", + "\x1bOA": "up", + "\x1bOB": "down", + "\x1bOC": "right", + "\x1bOD": "left", + # Rrvt + "\x1b[1~": "home", + "\x1b[4~": "end", + "\x1b[11~": "f1", + "\x1b[12~": "f2", + "\x1b[13~": "f3", + "\x1b[14~": "f4", + # Linux console + "\x1b[[A": "f1", + "\x1b[[B": "f2", + "\x1b[[C": "f3", + "\x1b[[D": "f4", + "\x1b[[E": "f5", + # Xterm + "\x1b[1;2P": "f13", + "\x1b[1;2Q": "f14", + "\x1b[1;2S": "f16", + "\x1b[15;2~": "f17", + "\x1b[17;2~": "f18", + "\x1b[18;2~": "f19", + "\x1b[19;2~": "f20", + "\x1b[20;2~": "f21", + "\x1b[21;2~": "f22", + "\x1b[23;2~": "f23", + "\x1b[24;2~": "f24", +} + +WIN_CHAR_TO_READABLE: Final = { + "\x1b": "esc", + "\x08": "backspace", + "àR": "insert", + "àS": "delete", + "àI": "pageup", + "àQ": "pagedown", + "àG": "home", + "àO": "end", + "àH": "up", + "àP": "down", + "àM": "right", + "àK": "left", + "\x00;": "f1", + "\x00<": "f2", + "\x00=": "f3", + "\x00>": "f4", + "\x00?": "f5", + "\x00@": "f6", + "\x00A": "f7", + "\x00B": "f8", + "\x00C": "f9", + "\x00D": "f10", + # "": "f11", ? + "à†": "f12", +} + +# Some non-ansi characters that need a readable representation +CHAR_TO_READABLE: Final = { + "\t": "tab", + "\n": "enter", + "\r": "enter", + " ": "space", +} + +WIN_SPECIAL_CHAR_STARTS: Final = {"\x1b", "\x08", "\x00", "\xe0"} +WIN_REQUIRES_TWO_READS_STARTS: Final = {"\x00", "\xe0"} diff --git a/src/sshkeyboard/char_reader.py b/src/sshkeyboard/char_reader.py new file mode 100644 index 0000000..1f6ffab --- /dev/null +++ b/src/sshkeyboard/char_reader.py @@ -0,0 +1,97 @@ +"""Platform independent char reader.""" +from __future__ import annotations + +import sys +from abc import ABC, abstractmethod +from platform import system +from typing import Any, Optional + +from . import char_maps as cmap + +_is_windows = system().lower() == "windows" + +if _is_windows: + import msvcrt + + +class CharReader(ABC): + @abstractmethod + def read(self, debug: bool = False) -> Optional[str]: + ... + + +class WinCharReader(CharReader): + def read(self, debug: bool = False) -> Optional[str]: + # Return if nothing to read + if not msvcrt.kbhit(): + return "" + + char = msvcrt.getwch() + if char in cmap.WIN_SPECIAL_CHAR_STARTS: + # Check if requires one more read + if char in cmap.WIN_REQUIRES_TWO_READS_STARTS: + char += msvcrt.getwch() + + if char in cmap.WIN_CHAR_TO_READABLE: + return cmap.WIN_CHAR_TO_READABLE[char] + else: + if debug: + print(f"Non-supported win char: {repr(char)}") + return None + + # Change some character representations to readable strings + elif char in cmap.CHAR_TO_READABLE: + char = cmap.CHAR_TO_READABLE[char] + + return char + + +class UnixCharReader(CharReader): + def read(self, debug: bool = False) -> Optional[str]: + raw: Any + char: Optional[str] = self._read_unix_stdin(1) + + # Skip and continue if read failed + if char is None: + return None + + # Handle any character + elif char != "": + # Read more if ansi character, skip and continue if unknown + if self._is_unix_ansi(char): + char, raw = self._read_and_parse_unix_ansi(char) + if char is None: + if debug: + print(f"Non-supported ansi char: {repr(raw)}") + return None + # Change some character representations to readable strings + elif char in cmap.CHAR_TO_READABLE: + char = cmap.CHAR_TO_READABLE[char] + + return char + + def _read_unix_stdin(self, amount: int) -> str: + try: + return sys.stdin.read(amount) + except IOError: + return "" + + # '\x' at the start is a good indicator for ansi character + def _is_unix_ansi(self, char: str): + rep = repr(char) + return len(rep) >= 2 and rep[1] == "\\" and rep[2] == "x" + + def _read_and_parse_unix_ansi(self, char: str): + char += self._read_unix_stdin(5) + if char in cmap.UNIX_ANSI_CHAR_TO_READABLE: + return cmap.UNIX_ANSI_CHAR_TO_READABLE[char], char + else: + return None, char + + +class CharReaderFactory: + @staticmethod + def create() -> CharReader: + if system().lower() == "windows": + return WinCharReader() + return UnixCharReader() diff --git a/src/sshkeyboard/context_managers.py b/src/sshkeyboard/context_managers.py new file mode 100644 index 0000000..86e94ed --- /dev/null +++ b/src/sshkeyboard/context_managers.py @@ -0,0 +1,48 @@ +"""Holds context managers.""" + +# Raw and _nonblocking inspiration from: +# http://ballingt.com/_nonblocking-stdin-in-python-3/ +import os +from contextlib import contextmanager +from platform import system +from typing import IO, Any, List + +_is_windows = system().lower() == "windows" + +if not _is_windows: + import fcntl + import termios + import tty + + +@contextmanager +def raw(stream: IO[Any]): + # Not required on windows + if _is_windows: + yield + return + + original_stty: List[Any] = termios.tcgetattr(stream) # type: ignore + try: + tty.setcbreak(stream) # type: ignore + yield + finally: + termios.tcsetattr( # type: ignore + stream, termios.TCSANOW, original_stty # type: ignore + ) + + +@contextmanager +def nonblocking(stream: IO[Any]): + # Not required on windows + if _is_windows: + yield + return + + fd = stream.fileno() + orig_fl = fcntl.fcntl(fd, fcntl.F_GETFL) # type: ignore + try: + fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK) # type: ignore + yield + finally: + fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl) # type: ignore diff --git a/src/sshkeyboard/errors.py b/src/sshkeyboard/errors.py new file mode 100644 index 0000000..14e1d68 --- /dev/null +++ b/src/sshkeyboard/errors.py @@ -0,0 +1,5 @@ +"""Module that holds custom error classes.""" + + +class MultipleListenerError(Exception): + pass diff --git a/src/sshkeyboard/key_listener.py b/src/sshkeyboard/key_listener.py index 347a432..df9a8c3 100644 --- a/src/sshkeyboard/key_listener.py +++ b/src/sshkeyboard/key_listener.py @@ -1,284 +1,34 @@ """Key handler module.""" - - -# Readable representations for selected ansi characters -# All possible ansi characters here: -# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/master/prompt_toolkit/input/ansi_escape_sequences.py -# Listener does not support modifier keys for now import asyncio import concurrent.futures -import os import sys -from contextlib import contextmanager from dataclasses import InitVar, dataclass from platform import system from time import time from typing import ( - IO, Any, Awaitable, Callable, Coroutine, Dict, - List, Optional, TypeAlias, Union, ) +from .char_reader import CharReaderFactory +from .context_managers import nonblocking, raw +from .errors import MultipleListenerError + try: from ._asyncio_run_backport_36 import run36 except ImportError: # this allows local testing: python __init__.py from _asyncio_run_backport_36 import run36 # type:ignore -_is_windows = system().lower() == "windows" - -if _is_windows: - import msvcrt -else: - import fcntl - import termios - import tty Key: TypeAlias = str CallbackFunction: TypeAlias = Callable[[Key], Any] - -_UNIX_ANSI_CHAR_TO_READABLE = { - # 'Regular' characters - "\x1b": "esc", - "\x7f": "backspace", - "\x1b[2~": "insert", - "\x1b[3~": "delete", - "\x1b[5~": "pageup", - "\x1b[6~": "pagedown", - "\x1b[H": "home", - "\x1b[F": "end", - "\x1b[A": "up", - "\x1b[B": "down", - "\x1b[C": "right", - "\x1b[D": "left", - "\x1bOP": "f1", - "\x1bOQ": "f2", - "\x1bOR": "f3", - "\x1bOS": "f4", - "\x1b[15~": "f5", - "\x1b[17~": "f6", - "\x1b[18~": "f7", - "\x1b[19~": "f8", - "\x1b[20~": "f9", - "\x1b[21~": "f10", - "\x1b[23~": "f11", - "\x1b[24~": "f12", - "\x1b[25~": "f13", - "\x1b[26~": "f14", - "\x1b[28~": "f15", - "\x1b[29~": "f16", - "\x1b[31~": "f17", - "\x1b[32~": "f18", - "\x1b[33~": "f19", - "\x1b[34~": "f20", - # Special/duplicate: - # Tmux, Emacs - "\x1bOH": "home", - "\x1bOF": "end", - "\x1bOA": "up", - "\x1bOB": "down", - "\x1bOC": "right", - "\x1bOD": "left", - # Rrvt - "\x1b[1~": "home", - "\x1b[4~": "end", - "\x1b[11~": "f1", - "\x1b[12~": "f2", - "\x1b[13~": "f3", - "\x1b[14~": "f4", - # Linux console - "\x1b[[A": "f1", - "\x1b[[B": "f2", - "\x1b[[C": "f3", - "\x1b[[D": "f4", - "\x1b[[E": "f5", - # Xterm - "\x1b[1;2P": "f13", - "\x1b[1;2Q": "f14", - "\x1b[1;2S": "f16", - "\x1b[15;2~": "f17", - "\x1b[17;2~": "f18", - "\x1b[18;2~": "f19", - "\x1b[19;2~": "f20", - "\x1b[20;2~": "f21", - "\x1b[21;2~": "f22", - "\x1b[23;2~": "f23", - "\x1b[24;2~": "f24", -} - -_WIN_CHAR_TO_READABLE = { - "\x1b": "esc", - "\x08": "backspace", - "àR": "insert", - "àS": "delete", - "àI": "pageup", - "àQ": "pagedown", - "àG": "home", - "àO": "end", - "àH": "up", - "àP": "down", - "àM": "right", - "àK": "left", - "\x00;": "f1", - "\x00<": "f2", - "\x00=": "f3", - "\x00>": "f4", - "\x00?": "f5", - "\x00@": "f6", - "\x00A": "f7", - "\x00B": "f8", - "\x00C": "f9", - "\x00D": "f10", - # "": "f11", ? - "à†": "f12", -} - -# Some non-ansi characters that need a readable representation -_CHAR_TO_READABLE = { - "\t": "tab", - "\n": "enter", - "\r": "enter", - " ": "space", -} - -_WIN_SPECIAL_CHAR_STARTS = {"\x1b", "\x08", "\x00", "\xe0"} -_WIN_REQUIRES_TWO_READS_STARTS = {"\x00", "\xe0"} - - -def _is_python_36(): - return sys.version_info.major == 3 and sys.version_info.minor == 6 - - -def _done( - task: Union[asyncio.Future[Any], concurrent.futures.Future[None]] -) -> None: - if not task.cancelled(): - ex = task.exception() - if ex is not None: - raise ex - # traceback.print_exception(type(ex), ex, ex.__traceback__) - - -# Raw and _nonblocking inspiration from: -# http://ballingt.com/_nonblocking-stdin-in-python-3/ -@contextmanager -def _raw(stream: IO[Any]): - # Not required on windows - if _is_windows: - yield - return - - original_stty: List[Any] = termios.tcgetattr(stream) # type: ignore - try: - tty.setcbreak(stream) # type: ignore - yield - finally: - termios.tcsetattr( # type: ignore - stream, termios.TCSANOW, original_stty # type: ignore - ) - - -@contextmanager -def _nonblocking(stream: IO[Any]): - # Not required on windows - if _is_windows: - yield - return - - fd = stream.fileno() - orig_fl = fcntl.fcntl(fd, fcntl.F_GETFL) # type: ignore - try: - fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK) # type: ignore - yield - finally: - fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl) # type: ignore - - -class MultipleListenerError(Exception): - pass - - -def _read_char(debug: bool) -> Optional[str]: - if _is_windows: - return _read_char_win(debug) - else: - return _read_char_unix(debug) - - -def _read_char_win(debug: bool) -> Optional[str]: - # Return if nothing to read - if not msvcrt.kbhit(): - return "" - - char = msvcrt.getwch() - if char in _WIN_SPECIAL_CHAR_STARTS: - # Check if requires one more read - if char in _WIN_REQUIRES_TWO_READS_STARTS: - char += msvcrt.getwch() - - if char in _WIN_CHAR_TO_READABLE: - return _WIN_CHAR_TO_READABLE[char] - else: - if debug: - print(f"Non-supported win char: {repr(char)}") - return None - - # Change some character representations to readable strings - elif char in _CHAR_TO_READABLE: - char = _CHAR_TO_READABLE[char] - - return char - - -def _read_unix_stdin(amount: int) -> str: - try: - return sys.stdin.read(amount) - except IOError: - return "" - - -# '\x' at the start is a good indicator for ansi character -def _is_unix_ansi(char: str): - rep = repr(char) - return len(rep) >= 2 and rep[1] == "\\" and rep[2] == "x" - - -def _read_and_parse_unix_ansi(char: str): - char += _read_unix_stdin(5) - if char in _UNIX_ANSI_CHAR_TO_READABLE: - return _UNIX_ANSI_CHAR_TO_READABLE[char], char - else: - return None, char - - -def _read_char_unix(debug: bool) -> Optional[str]: - raw: Any - char: Optional[str] = _read_unix_stdin(1) - - # Skip and continue if read failed - if char is None: - return None - - # Handle any character - elif char != "": - # Read more if ansi character, skip and continue if unknown - if _is_unix_ansi(char): - char, raw = _read_and_parse_unix_ansi(char) - if char is None: - if debug: - print(f"Non-supported ansi char: {repr(raw)}") - return None - # Change some character representations to readable strings - elif char in _CHAR_TO_READABLE: - char = _CHAR_TO_READABLE[char] - - return char +_is_windows = system().lower() == "windows" @dataclass @@ -338,6 +88,7 @@ def __post_init__( self.on_release = self._callback(on_release_func) self.executor: Optional[concurrent.futures.ThreadPoolExecutor] = None self._running = False + self.char_reader = CharReaderFactory.create() # Create thread pool executor only if it will get used # executor = None if not self.sequential and ( @@ -352,6 +103,16 @@ def _callback( self, cb_function: Optional[CallbackFunction] ) -> Callable[[str], Coroutine[Any, Any, Any]]: async def _cb(key: Key): + def _done( + task: Union[ + asyncio.Future[Any], concurrent.futures.Future[None] + ] + ) -> None: + if not task.cancelled(): + ex = task.exception() + if ex is not None: + raise ex + if cb_function is None: return @@ -410,7 +171,8 @@ def release(key): coro = self.listen_keyboard_manual() - if _is_python_36(): + # if python is version 3.6 + if sys.version_info.major == 3 and sys.version_info.minor == 6: run36(coro) else: asyncio.run(coro) @@ -448,7 +210,7 @@ async def listen_keyboard_manual( if self.on_press is None and self.on_release is None: raise ValueError("Either on_press or on_release should be defined") # Listen - with _raw(sys.stdin), _nonblocking(sys.stdin): + with raw(sys.stdin), nonblocking(sys.stdin): state: Dict[str, Any] = {} while self.running: state = await self._react_to_input(**state) @@ -464,7 +226,7 @@ async def _react_to_input( current: Optional[str] = "", ) -> Dict[str, Any]: # Read next character - current = _read_char(self.debug) + current = self.char_reader.read(self.debug) # Skip and continue if read failed if current is None: From a25f604ba52ae786faf4f89e8d7ed3adf7bcece7 Mon Sep 17 00:00:00 2001 From: rbroderi Date: Wed, 20 Apr 2022 19:36:17 -0400 Subject: [PATCH 4/6] added optional support for beartype runtime type checking to match original intention of assert calls --- src/sshkeyboard/key_listener.py | 51 ++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/src/sshkeyboard/key_listener.py b/src/sshkeyboard/key_listener.py index df9a8c3..f1627af 100644 --- a/src/sshkeyboard/key_listener.py +++ b/src/sshkeyboard/key_listener.py @@ -5,16 +5,45 @@ from dataclasses import InitVar, dataclass from platform import system from time import time -from typing import ( - Any, - Awaitable, - Callable, - Coroutine, - Dict, - Optional, - TypeAlias, - Union, -) +from typing import TypeAlias, TypeVar + +try: + # Import PEP-agnostic type hints from "beartype.typing", a stand-in + # replacement for the standard "typing" module providing improved forward + # compatibility with future Python releases. + # type ignore for pylance error, opened issue: + # https://github.com/beartype/beartype/issues/126 + from beartype import beartype # type: ignore + + # TypeAlias, seems to cause issues with pylance + # opened issue https://github.com/beartype/beartype/issues/127 + # from beartype.typing import ( + # Any, + # Awaitable, + # Callable, + # Coroutine, + # Dict, + # Optional, + # Union, + # ) +except ModuleNotFoundError: + from typing import ( + Any, + Awaitable, + Callable, + Coroutine, + Dict, + Optional, + Union, + ) + + FuncT = TypeVar("FuncT", bound=Callable[..., Any]) + + def noop_dec(func: FuncT) -> FuncT: + return func + + beartype: Callable[..., Any] = noop_dec # type: ignore[no-redef] + from .char_reader import CharReaderFactory from .context_managers import nonblocking, raw @@ -31,6 +60,8 @@ _is_windows = system().lower() == "windows" +# runtime type check, similar to original assert but more robust. +@beartype @dataclass class KeyListener: """Key listener class. From ba81d08766feb2c350bc970f50986c66e95f78e2 Mon Sep 17 00:00:00 2001 From: rbroderi Date: Sat, 30 Apr 2022 16:21:37 -0400 Subject: [PATCH 5/6] cleanup (black and flake8) --- src/sshkeyboard/_asyncio_run_backport_36.py | 20 +++--- src/sshkeyboard/_asyncio_run_backport_36.pyi | 15 ----- src/sshkeyboard/char_reader.py | 21 +++++- src/sshkeyboard/context_managers.py | 12 ++-- src/sshkeyboard/errors.py | 2 + src/sshkeyboard/key_listener.py | 67 ++++++++++---------- 6 files changed, 69 insertions(+), 68 deletions(-) delete mode 100644 src/sshkeyboard/_asyncio_run_backport_36.pyi diff --git a/src/sshkeyboard/_asyncio_run_backport_36.py b/src/sshkeyboard/_asyncio_run_backport_36.py index 0214329..7de4cc9 100644 --- a/src/sshkeyboard/_asyncio_run_backport_36.py +++ b/src/sshkeyboard/_asyncio_run_backport_36.py @@ -1,6 +1,8 @@ +# type: ignore """ Backport of the asyncio.runners module from Python 3.7 to Python 3.6. """ +# flake8: noqa # Source: # https://gist.github.com/nickdavies/4a37c6cd9dcc7041fddd2d2a81cee383 # https://github.com/python/cpython/blob/a4afcdfa55ddffa4b9ae3b0cf101628c7bff4102/Lib/asyncio/runners.py @@ -20,9 +22,7 @@ try: from asyncio import get_running_loop # noqa Python >=3.7 except ImportError: # pragma: no cover - from asyncio.events import ( - _get_running_loop as get_running_loop, # pragma: no cover - ) + from asyncio.events import _get_running_loop as get_running_loop # pragma: no cover __all__ = ("run36",) # noqa _T = TypeVar("_T") @@ -73,7 +73,7 @@ def _safe_task_factory(loop, coro): def run36( main: Union[Coroutine[Any, None, _T], Awaitable[_T]], *, - debug: bool = False + debug: bool = False, ) -> _T: """Run a coroutine. This function runs the passed coroutine, taking care of @@ -97,9 +97,7 @@ async def main(): except RuntimeError: loop = None if loop is not None: - raise RuntimeError( - "asyncio.run() cannot be called from a running event loop" - ) + raise RuntimeError("asyncio.run() cannot be called from a running event loop") if not asyncio.iscoroutine(main): raise ValueError("a coroutine was expected, got {!r}".format(main)) @@ -130,7 +128,7 @@ def _cancel_all_tasks(loop, tasks): task.cancel() loop.run_until_complete( - asyncio.gather(*to_cancel, loop=loop, return_exceptions=True) + asyncio.gather(*to_cancel, loop=loop, return_exceptions=True), ) for task in to_cancel: @@ -139,10 +137,8 @@ def _cancel_all_tasks(loop, tasks): if task.exception() is not None: loop.call_exception_handler( { - "message": ( - "unhandled exception during asyncio.run() shutdown" - ), + "message": "unhandled exception during asyncio.run() shutdown", "exception": task.exception(), "task": task, - } + }, ) diff --git a/src/sshkeyboard/_asyncio_run_backport_36.pyi b/src/sshkeyboard/_asyncio_run_backport_36.pyi deleted file mode 100644 index 6ffaf59..0000000 --- a/src/sshkeyboard/_asyncio_run_backport_36.pyi +++ /dev/null @@ -1,15 +0,0 @@ -""" -This type stub file was generated by pyright. -""" - -from typing import Any, Awaitable, Coroutine, TypeVar, Union - -""" -Backport of the asyncio.runners""" -__all__ = ("run36", ) -_T = TypeVar("_T") -def run36(main: Union[Coroutine[Any, None, _T], Awaitable[_T]], *, debug: bool = ...) -> _T: - """Run a coroutine. - This functi""" - ... - diff --git a/src/sshkeyboard/char_reader.py b/src/sshkeyboard/char_reader.py index 1f6ffab..6d276a1 100644 --- a/src/sshkeyboard/char_reader.py +++ b/src/sshkeyboard/char_reader.py @@ -4,7 +4,7 @@ import sys from abc import ABC, abstractmethod from platform import system -from typing import Any, Optional +from typing import Any, Optional, Union from . import char_maps as cmap @@ -15,13 +15,19 @@ class CharReader(ABC): + """Abstract base class for character reader.""" + @abstractmethod def read(self, debug: bool = False) -> Optional[str]: + """Read a character from input.""" ... class WinCharReader(CharReader): + """Windows character reader.""" + def read(self, debug: bool = False) -> Optional[str]: + """Read a character from input.""" # Return if nothing to read if not msvcrt.kbhit(): return "" @@ -47,7 +53,10 @@ def read(self, debug: bool = False) -> Optional[str]: class UnixCharReader(CharReader): + """Unix / Linix character reader.""" + def read(self, debug: bool = False) -> Optional[str]: + """Read a character from input.""" raw: Any char: Optional[str] = self._read_unix_stdin(1) @@ -77,11 +86,14 @@ def _read_unix_stdin(self, amount: int) -> str: return "" # '\x' at the start is a good indicator for ansi character - def _is_unix_ansi(self, char: str): + def _is_unix_ansi(self, char: str) -> bool: rep = repr(char) return len(rep) >= 2 and rep[1] == "\\" and rep[2] == "x" - def _read_and_parse_unix_ansi(self, char: str): + def _read_and_parse_unix_ansi( + self, + char: str, + ) -> Union[tuple[str, str], tuple[None, str]]: char += self._read_unix_stdin(5) if char in cmap.UNIX_ANSI_CHAR_TO_READABLE: return cmap.UNIX_ANSI_CHAR_TO_READABLE[char], char @@ -90,8 +102,11 @@ def _read_and_parse_unix_ansi(self, char: str): class CharReaderFactory: + """Factory that returns the correct reader for the system.""" + @staticmethod def create() -> CharReader: + """Return system independent character reader.""" if system().lower() == "windows": return WinCharReader() return UnixCharReader() diff --git a/src/sshkeyboard/context_managers.py b/src/sshkeyboard/context_managers.py index 86e94ed..1087040 100644 --- a/src/sshkeyboard/context_managers.py +++ b/src/sshkeyboard/context_managers.py @@ -5,7 +5,7 @@ import os from contextlib import contextmanager from platform import system -from typing import IO, Any, List +from typing import IO, Any, Generator, List _is_windows = system().lower() == "windows" @@ -16,7 +16,8 @@ @contextmanager -def raw(stream: IO[Any]): +def raw(stream: IO[Any]) -> Generator[None, None, None]: + """Raw stdin from steam.""" # Not required on windows if _is_windows: yield @@ -28,12 +29,15 @@ def raw(stream: IO[Any]): yield finally: termios.tcsetattr( # type: ignore - stream, termios.TCSANOW, original_stty # type: ignore + stream, + termios.TCSANOW, # type: ignore + original_stty, # type: ignore ) @contextmanager -def nonblocking(stream: IO[Any]): +def nonblocking(stream: IO[Any]) -> Generator[None, None, None]: + """Non-blocking stdin from steam.""" # Not required on windows if _is_windows: yield diff --git a/src/sshkeyboard/errors.py b/src/sshkeyboard/errors.py index 14e1d68..daa6b62 100644 --- a/src/sshkeyboard/errors.py +++ b/src/sshkeyboard/errors.py @@ -2,4 +2,6 @@ class MultipleListenerError(Exception): + """An error if multiple listeners are trying to run at same time.""" + pass diff --git a/src/sshkeyboard/key_listener.py b/src/sshkeyboard/key_listener.py index f1627af..5c6a652 100644 --- a/src/sshkeyboard/key_listener.py +++ b/src/sshkeyboard/key_listener.py @@ -17,17 +17,7 @@ # TypeAlias, seems to cause issues with pylance # opened issue https://github.com/beartype/beartype/issues/127 - # from beartype.typing import ( - # Any, - # Awaitable, - # Callable, - # Coroutine, - # Dict, - # Optional, - # Union, - # ) -except ModuleNotFoundError: - from typing import ( + from beartype.typing import ( Any, Awaitable, Callable, @@ -36,13 +26,15 @@ Optional, Union, ) +except ModuleNotFoundError: + from typing import Any, Awaitable, Callable, Coroutine, Dict, Optional, Union FuncT = TypeVar("FuncT", bound=Callable[..., Any]) - def noop_dec(func: FuncT) -> FuncT: + def _noop_dec(func: FuncT) -> FuncT: return func - beartype: Callable[..., Any] = noop_dec # type: ignore[no-redef] + beartype: Callable[..., Any] = _noop_dec # type: ignore[no-redef] from .char_reader import CharReaderFactory @@ -50,7 +42,7 @@ def noop_dec(func: FuncT) -> FuncT: from .errors import MultipleListenerError try: - from ._asyncio_run_backport_36 import run36 + from ._asyncio_run_backport_36 import run36 # type: ignore except ImportError: # this allows local testing: python __init__.py from _asyncio_run_backport_36 import run36 # type:ignore @@ -109,11 +101,12 @@ def __post_init__( on_press_func: Optional[CallbackFunction], on_release_func: Optional[CallbackFunction], ) -> None: + """Set on_press and on_release after dataclass init.""" # Check the system if sys.version_info < (3, 6): raise RuntimeError( - "sshkeyboard requires Python version 3.6+, you have " - f"{sys.version_info.major}.{sys.version_info.minor}" + f"{__package__} requires Python version 3.6+, you have " + f"{sys.version_info.major}.{sys.version_info.minor}", ) self.on_press = self._callback(on_press_func) self.on_release = self._callback(on_release_func) @@ -127,17 +120,16 @@ def __post_init__( or not asyncio.iscoroutinefunction(self.on_release) ): self._executor = concurrent.futures.ThreadPoolExecutor( - max_workers=self.max_thread_pool_workers + max_workers=self.max_thread_pool_workers, ) def _callback( - self, cb_function: Optional[CallbackFunction] + self, + cb_function: Optional[CallbackFunction], ) -> Callable[[str], Coroutine[Any, Any, Any]]: - async def _cb(key: Key): + async def _cb(key: Key) -> None: def _done( - task: Union[ - asyncio.Future[Any], concurrent.futures.Future[None] - ] + task: Union[asyncio.Future[Any], concurrent.futures.Future[None]], ) -> None: if not task.cancelled(): ex = task.exception() @@ -155,9 +147,7 @@ def _done( cb_function(key) else: if asyncio.iscoroutinefunction(cb_function): - task: asyncio.Task[None] = asyncio.create_task( - cb_function(key) - ) + task: asyncio.Task[None] = asyncio.create_task(cb_function(key)) task.add_done_callback(_done) else: future = self.executor.submit(cb_function, key) @@ -167,14 +157,13 @@ def _done( @property def running(self) -> bool: + """Return current status of if the listener is running.""" return self._running def listen_keyboard( self, ) -> None: - - """Listen for keyboard events and fire `on_press` and `on_release` - callback functions + """Listen for events and fire `on_press` and `on_release` cb functions. Supports asynchronous callbacks also. @@ -188,18 +177,20 @@ def listen_keyboard( from sshkeyboard import listen_keyboard + async def press(key): print(f"'{key}' pressed") + def release(key): print(f"'{key}' released") + listen_keyboard( on_press=press, on_release=release, ) """ - coro = self.listen_keyboard_manual() # if python is version 3.6 @@ -211,7 +202,9 @@ def release(key): async def listen_keyboard_manual( self, ) -> None: - """The same as :func:`~sshkeyboard.listen_keyboard`, but now the + """Listen for keyboard events, awaiting must be handled by caller. + + The same as :func:`~sshkeyboard.listen_keyboard`, but now the awaiting must be handled by the caller .. code-block:: python @@ -251,14 +244,18 @@ async def listen_keyboard_manual( async def _react_to_input( self, - press_time: float = time(), - initial_press_time: float = time(), + press_time: Optional[float] = None, + initial_press_time: Optional[float] = None, previous: str = "", current: Optional[str] = "", ) -> Dict[str, Any]: # Read next character current = self.char_reader.read(self.debug) + if press_time is None: + press_time = time() + if initial_press_time is None: + initial_press_time = time() # Skip and continue if read failed if current is None: return { @@ -321,13 +318,13 @@ async def _react_to_input( "current": current, } - def _clean_up(self): + def _clean_up(self) -> None: if self.executor is not None: self.executor.shutdown() self._running = False def stop_listening(self) -> None: - """Stops the ongoing keyboard listeners + """Stop the ongoing keyboard listeners. Can be called inside the callbacks or from outside. Does not do anything if listener is not running. @@ -339,11 +336,13 @@ def stop_listening(self) -> None: from sshkeyboard import listen_keyboard, stop_listening + def press(key): print(f"'{key}' pressed") if key == "z": stop_listening() + listen_keyboard(on_press=press) """ if self.running: From 97b479bd726970dc8c0867d7bf8185297842764c Mon Sep 17 00:00:00 2001 From: rbroderi Date: Sat, 30 Apr 2022 17:06:52 -0400 Subject: [PATCH 6/6] removed some errors that pylance was throwing --- src/sshkeyboard/__init__.py | 3 +- src/sshkeyboard/char_maps.py | 210 ++++++++++++++++---------------- src/sshkeyboard/char_reader.py | 22 ++-- src/sshkeyboard/key_listener.py | 4 +- 4 files changed, 122 insertions(+), 117 deletions(-) diff --git a/src/sshkeyboard/__init__.py b/src/sshkeyboard/__init__.py index d276f03..b3f51d1 100644 --- a/src/sshkeyboard/__init__.py +++ b/src/sshkeyboard/__init__.py @@ -1,5 +1,6 @@ """sshkeyboard""" - +# flake8: noqa +# pyright: reportUnusedImport=false __version__ = "3.0.0" from .key_listener import Key, KeyListener diff --git a/src/sshkeyboard/char_maps.py b/src/sshkeyboard/char_maps.py index 989c878..5391115 100644 --- a/src/sshkeyboard/char_maps.py +++ b/src/sshkeyboard/char_maps.py @@ -6,109 +6,113 @@ # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/master/prompt_toolkit/input/ansi_escape_sequences.py # Listener does not support modifier keys for now -UNIX_ANSI_CHAR_TO_READABLE: Final = { - # 'Regular' characters - "\x1b": "esc", - "\x7f": "backspace", - "\x1b[2~": "insert", - "\x1b[3~": "delete", - "\x1b[5~": "pageup", - "\x1b[6~": "pagedown", - "\x1b[H": "home", - "\x1b[F": "end", - "\x1b[A": "up", - "\x1b[B": "down", - "\x1b[C": "right", - "\x1b[D": "left", - "\x1bOP": "f1", - "\x1bOQ": "f2", - "\x1bOR": "f3", - "\x1bOS": "f4", - "\x1b[15~": "f5", - "\x1b[17~": "f6", - "\x1b[18~": "f7", - "\x1b[19~": "f8", - "\x1b[20~": "f9", - "\x1b[21~": "f10", - "\x1b[23~": "f11", - "\x1b[24~": "f12", - "\x1b[25~": "f13", - "\x1b[26~": "f14", - "\x1b[28~": "f15", - "\x1b[29~": "f16", - "\x1b[31~": "f17", - "\x1b[32~": "f18", - "\x1b[33~": "f19", - "\x1b[34~": "f20", - # Special/duplicate: - # Tmux, Emacs - "\x1bOH": "home", - "\x1bOF": "end", - "\x1bOA": "up", - "\x1bOB": "down", - "\x1bOC": "right", - "\x1bOD": "left", - # Rrvt - "\x1b[1~": "home", - "\x1b[4~": "end", - "\x1b[11~": "f1", - "\x1b[12~": "f2", - "\x1b[13~": "f3", - "\x1b[14~": "f4", - # Linux console - "\x1b[[A": "f1", - "\x1b[[B": "f2", - "\x1b[[C": "f3", - "\x1b[[D": "f4", - "\x1b[[E": "f5", - # Xterm - "\x1b[1;2P": "f13", - "\x1b[1;2Q": "f14", - "\x1b[1;2S": "f16", - "\x1b[15;2~": "f17", - "\x1b[17;2~": "f18", - "\x1b[18;2~": "f19", - "\x1b[19;2~": "f20", - "\x1b[20;2~": "f21", - "\x1b[21;2~": "f22", - "\x1b[23;2~": "f23", - "\x1b[24;2~": "f24", -} -WIN_CHAR_TO_READABLE: Final = { - "\x1b": "esc", - "\x08": "backspace", - "àR": "insert", - "àS": "delete", - "àI": "pageup", - "àQ": "pagedown", - "àG": "home", - "àO": "end", - "àH": "up", - "àP": "down", - "àM": "right", - "àK": "left", - "\x00;": "f1", - "\x00<": "f2", - "\x00=": "f3", - "\x00>": "f4", - "\x00?": "f5", - "\x00@": "f6", - "\x00A": "f7", - "\x00B": "f8", - "\x00C": "f9", - "\x00D": "f10", - # "": "f11", ? - "à†": "f12", -} +class CharMaps: + """Holds character mapping constants.""" -# Some non-ansi characters that need a readable representation -CHAR_TO_READABLE: Final = { - "\t": "tab", - "\n": "enter", - "\r": "enter", - " ": "space", -} + UNIX_ANSI_CHAR_TO_READABLE: Final = { + # 'Regular' characters + "\x1b": "esc", + "\x7f": "backspace", + "\x1b[2~": "insert", + "\x1b[3~": "delete", + "\x1b[5~": "pageup", + "\x1b[6~": "pagedown", + "\x1b[H": "home", + "\x1b[F": "end", + "\x1b[A": "up", + "\x1b[B": "down", + "\x1b[C": "right", + "\x1b[D": "left", + "\x1bOP": "f1", + "\x1bOQ": "f2", + "\x1bOR": "f3", + "\x1bOS": "f4", + "\x1b[15~": "f5", + "\x1b[17~": "f6", + "\x1b[18~": "f7", + "\x1b[19~": "f8", + "\x1b[20~": "f9", + "\x1b[21~": "f10", + "\x1b[23~": "f11", + "\x1b[24~": "f12", + "\x1b[25~": "f13", + "\x1b[26~": "f14", + "\x1b[28~": "f15", + "\x1b[29~": "f16", + "\x1b[31~": "f17", + "\x1b[32~": "f18", + "\x1b[33~": "f19", + "\x1b[34~": "f20", + # Special/duplicate: + # Tmux, Emacs + "\x1bOH": "home", + "\x1bOF": "end", + "\x1bOA": "up", + "\x1bOB": "down", + "\x1bOC": "right", + "\x1bOD": "left", + # Rrvt + "\x1b[1~": "home", + "\x1b[4~": "end", + "\x1b[11~": "f1", + "\x1b[12~": "f2", + "\x1b[13~": "f3", + "\x1b[14~": "f4", + # Linux console + "\x1b[[A": "f1", + "\x1b[[B": "f2", + "\x1b[[C": "f3", + "\x1b[[D": "f4", + "\x1b[[E": "f5", + # Xterm + "\x1b[1;2P": "f13", + "\x1b[1;2Q": "f14", + "\x1b[1;2S": "f16", + "\x1b[15;2~": "f17", + "\x1b[17;2~": "f18", + "\x1b[18;2~": "f19", + "\x1b[19;2~": "f20", + "\x1b[20;2~": "f21", + "\x1b[21;2~": "f22", + "\x1b[23;2~": "f23", + "\x1b[24;2~": "f24", + } -WIN_SPECIAL_CHAR_STARTS: Final = {"\x1b", "\x08", "\x00", "\xe0"} -WIN_REQUIRES_TWO_READS_STARTS: Final = {"\x00", "\xe0"} + WIN_CHAR_TO_READABLE: Final = { + "\x1b": "esc", + "\x08": "backspace", + "àR": "insert", + "àS": "delete", + "àI": "pageup", + "àQ": "pagedown", + "àG": "home", + "àO": "end", + "àH": "up", + "àP": "down", + "àM": "right", + "àK": "left", + "\x00;": "f1", + "\x00<": "f2", + "\x00=": "f3", + "\x00>": "f4", + "\x00?": "f5", + "\x00@": "f6", + "\x00A": "f7", + "\x00B": "f8", + "\x00C": "f9", + "\x00D": "f10", + # "": "f11", ? + "à†": "f12", + } + + # Some non-ansi characters that need a readable representation + CHAR_TO_READABLE: Final = { + "\t": "tab", + "\n": "enter", + "\r": "enter", + " ": "space", + } + + WIN_SPECIAL_CHAR_STARTS: Final = {"\x1b", "\x08", "\x00", "\xe0"} + WIN_REQUIRES_TWO_READS_STARTS: Final = {"\x00", "\xe0"} diff --git a/src/sshkeyboard/char_reader.py b/src/sshkeyboard/char_reader.py index 6d276a1..97808be 100644 --- a/src/sshkeyboard/char_reader.py +++ b/src/sshkeyboard/char_reader.py @@ -6,7 +6,7 @@ from platform import system from typing import Any, Optional, Union -from . import char_maps as cmap +from .char_maps import CharMaps as cMap _is_windows = system().lower() == "windows" @@ -33,21 +33,21 @@ def read(self, debug: bool = False) -> Optional[str]: return "" char = msvcrt.getwch() - if char in cmap.WIN_SPECIAL_CHAR_STARTS: + if char in cMap.WIN_SPECIAL_CHAR_STARTS: # Check if requires one more read - if char in cmap.WIN_REQUIRES_TWO_READS_STARTS: + if char in cMap.WIN_REQUIRES_TWO_READS_STARTS: char += msvcrt.getwch() - if char in cmap.WIN_CHAR_TO_READABLE: - return cmap.WIN_CHAR_TO_READABLE[char] + if char in cMap.WIN_CHAR_TO_READABLE: + return cMap.WIN_CHAR_TO_READABLE[char] else: if debug: print(f"Non-supported win char: {repr(char)}") return None # Change some character representations to readable strings - elif char in cmap.CHAR_TO_READABLE: - char = cmap.CHAR_TO_READABLE[char] + elif char in cMap.CHAR_TO_READABLE: + char = cMap.CHAR_TO_READABLE[char] return char @@ -74,8 +74,8 @@ def read(self, debug: bool = False) -> Optional[str]: print(f"Non-supported ansi char: {repr(raw)}") return None # Change some character representations to readable strings - elif char in cmap.CHAR_TO_READABLE: - char = cmap.CHAR_TO_READABLE[char] + elif char in cMap.CHAR_TO_READABLE: + char = cMap.CHAR_TO_READABLE[char] return char @@ -95,8 +95,8 @@ def _read_and_parse_unix_ansi( char: str, ) -> Union[tuple[str, str], tuple[None, str]]: char += self._read_unix_stdin(5) - if char in cmap.UNIX_ANSI_CHAR_TO_READABLE: - return cmap.UNIX_ANSI_CHAR_TO_READABLE[char], char + if char in cMap.UNIX_ANSI_CHAR_TO_READABLE: + return cMap.UNIX_ANSI_CHAR_TO_READABLE[char], char else: return None, char diff --git a/src/sshkeyboard/key_listener.py b/src/sshkeyboard/key_listener.py index 5c6a652..7e3a815 100644 --- a/src/sshkeyboard/key_listener.py +++ b/src/sshkeyboard/key_listener.py @@ -29,9 +29,9 @@ except ModuleNotFoundError: from typing import Any, Awaitable, Callable, Coroutine, Dict, Optional, Union - FuncT = TypeVar("FuncT", bound=Callable[..., Any]) + T = TypeVar("T") - def _noop_dec(func: FuncT) -> FuncT: + def _noop_dec(func: T) -> T: return func beartype: Callable[..., Any] = _noop_dec # type: ignore[no-redef]