diff --git a/simulators/active_surface/__init__.py b/simulators/active_surface/__init__.py index ae99c7a..07899b3 100644 --- a/simulators/active_surface/__init__.py +++ b/simulators/active_surface/__init__.py @@ -1,5 +1,4 @@ import time -from threading import Thread, Event from socketserver import ThreadingTCPServer from simulators import utils from simulators.common import ListeningSystem @@ -67,7 +66,6 @@ class System(ListeningSystem): slope_time = 10 # msec def __init__(self, min_usd_index=0, max_usd_index=31): - self.initialized = False if min_usd_index < 0 or min_usd_index > 31: raise ValueError( 'Choose a minimum USD index between 0 and 31!' @@ -81,27 +79,10 @@ def __init__(self, min_usd_index=0, max_usd_index=31): 'max_usd_index cannot be lower than min_usd_index!' ) self._set_default() - self.stop = Event() self.min_usd_index = min_usd_index self.drivers = {} for index in range(min_usd_index, max_usd_index + 1): self.drivers[index] = USD(index) - self.positioning_thread = Thread( - target=self._positioning, - args=(self.drivers, self.stop) - ) - self.positioning_thread.daemon = True - self.initialized = True - self.positioning_thread.start() - - def __del__(self): - self.system_stop() - - def system_stop(self): - if self.initialized: - self.stop.set() - self.positioning_thread.join() - return super().system_stop() def _set_default(self): """Resets the received command string to its default value. @@ -951,28 +932,3 @@ def _set_working_mode(self, params): else: self.drivers[params[0]].set_working_mode(params[2]) return self.byte_ack - - @staticmethod - def _positioning(drivers, stop): - """This method runs in a dedicated thread. Its purpose its to call the - `calc_position` method for each USD of the line. Using a thread for - each USD will result in having 1116 concurrent threads, which are - definitely too many threads. Using this behavior, the number of - concurrent threads is limited to 96, one for each single line. - - :param drivers: the list of driver objects of the line - :param stop: the signal that tells the thread to stop and complete - :type drivers: list - :type stop: threading.Event""" - t0 = time.time() - while not stop.is_set(): - t1 = time.time() - elapsed = t1 - t0 - t0 = t1 - - for driver in drivers.values(): - driver.calc_position(t1, elapsed) - - t2 = time.time() - elapsed = t2 - t1 - time.sleep(max(0.01 - elapsed, 0)) diff --git a/simulators/active_surface/usd.py b/simulators/active_surface/usd.py index ff910c4..ecb3b89 100644 --- a/simulators/active_surface/usd.py +++ b/simulators/active_surface/usd.py @@ -1,4 +1,5 @@ import time +from threading import RLock from queue import Queue from simulators.utils import sign @@ -46,20 +47,22 @@ class USD: } def __init__(self, usd_index): + self.usd_index = usd_index self._set_default() self.last_movement = None - self.usd_index = usd_index + self._update_threshold = 0.01 + self._lock = RLock() def _set_default(self): """Called at initialization or reset phases to restore default parameters.""" self.reference_position = 0 - self.current_position = 0 + self._current_position = 0 self.position_queue = Queue() self.delay_multiplier = 5 # x * delay_step. 255: no response self.standby_delay_multiplier = 0 self.standby_mode = self.standby_modes.get(0) - self.current_percentage = 1.0 + self._current_percentage = 1.0 self.version = [1, 3] # Major, minor self.driver_type = 0x21 # 0x20: USD50xxx, 0x21: USD60xxx self.slope_delayer = 1 @@ -82,16 +85,106 @@ def _set_default(self): # I/O lines combination assumed by the driver after positioning to HOME self.home_io_level = [0, 0, 0] self.home_io_enable = [0, 0, 0] - self.running = False # True: driver is moving + self._running = False # True: driver is moving self.delayed_execution = False # True: positioning by TRIGGER event self.ready = False # True: new position enqueued - self.full_current = True + self._full_current = True self.auto_resolution = False self.resolution = self.resolutions.get(1) self.velocity = 0 self.baud_rate = self.baud_rates.get(0) - self.cmd_position = None - self.standby = False + self._cmd_position = None + self._standby = False + self._last_update_time = time.time() + + def _update_state(self): + with self._lock: + now = time.time() + elapsed = now - self._last_update_time + + if elapsed < self._update_threshold: + return + + self._last_update_time = now + + velocity = None + cmd_position = None + + if self.velocity: + velocity = self.velocity + self._running = True + elif self._cmd_position is not None: + cmd_position = self._cmd_position + self._running = True + else: + self._running = False + + if self._running: + self._current_percentage = 1.0 + self._full_current = True + self._standby = False + self.last_movement = now + + if velocity: + frequency = abs(velocity) + sign0 = sign(velocity) + elif cmd_position is not None: + frequency = self.max_frequency + sign0 = sign(cmd_position - self._current_position) + + if sign0 != 0: + steps_per_second = frequency * (128.0 / self.resolution) + delta_steps = int(steps_per_second * elapsed) + + if delta_steps > 0: + change = sign0 * delta_steps + new_position = self._current_position + change + + if cmd_position is not None: + if sign(cmd_position - new_position) != sign0: + new_position = cmd_position + self._cmd_position = None + self._running = False + self.last_movement = now + + new_position = max(new_position, self.min_position) + new_position = min(new_position, self.max_position) + + self._current_position = new_position + + if not self._running and not self._standby: + if self.last_movement: + time_stopped = now - self.last_movement + standby_delay = ( + self.standby_delay_multiplier * self.standby_delay_step + ) + if time_stopped >= standby_delay: + self._current_percentage = 1.0 - self.standby_mode + if self.standby_mode > 0: + self._full_current = False + else: + self._full_current = True + self._standby = True + + @property + def current_position(self): + self._update_state() + return self._current_position + + @property + def running(self): + self._update_state() + return self._running + + @property + def full_current(self): + self._update_state() + return self._full_current + + @property + def current_percentage(self): + self._update_state() + return self._current_percentage def soft_reset(self): """Resets the USD to it default values. When a reset is performed, the @@ -112,9 +205,9 @@ def soft_trigger(self): next_position, absolute = self.position_queue.get() if not self.velocity: if absolute: - self.cmd_position = next_position + self._cmd_position = next_position else: - self.cmd_position = self.current_position + next_position + self._cmd_position = self.current_position + next_position if self.position_queue.empty() is True: self.ready = False @@ -129,7 +222,8 @@ def soft_stop(self): """Immediate stop. The USD stops even if it did not completed its previous positioning movement.""" self.velocity = None - self.cmd_position = None + self._cmd_position = None + self._running = False def get_position(self): """Returns the USD current position. @@ -349,7 +443,7 @@ def set_absolute_position(self, position): # Driver is already moving, return False # so the parser can return a byte_nak return False - self.cmd_position = cmd_position + self._cmd_position = cmd_position return True def set_relative_position(self, position): @@ -378,7 +472,7 @@ def set_relative_position(self, position): # Driver is already moving, return False # so the parser can return a byte_nak return False - self.cmd_position = cmd_position + self._cmd_position = cmd_position return True def rotate(self, rotation_sign): @@ -397,7 +491,7 @@ def rotate(self, rotation_sign): # so the parser can return a byte_nak return False else: - self.cmd_position = rotation_sign * self.out_of_scale_position + self._cmd_position = rotation_sign * self.out_of_scale_position return True def set_velocity(self, velocity): @@ -414,7 +508,7 @@ def set_velocity(self, velocity): elif velocity == 0: velocity = None # Start rotating with given velocity without an acceleration ramp - self.cmd_position = None + self._cmd_position = None self.velocity = velocity return True @@ -517,66 +611,3 @@ def set_working_mode(self, params): # binary_string[0:7] is currently unused self.baud_rate = self.baud_rates.get(int(binary_string[7], 2)) # params[1] is currently unused - - def calc_position(self, now, elapsed): - """Calculates the current position of the USD considering its current - status and previously set parameters, along with the elapsed time since - last position calculation. - - :param now: the current time in UNIX time - :param elapsed: the elapsed time in seconds since last position - calculation - :type elapsed: float""" - velocity = None - cmd_position = None - if self.velocity: - velocity = self.velocity - self.running = True - elif self.cmd_position is not None: - cmd_position = self.cmd_position - self.running = True - else: - self.running = False - - if self.running: - self.current_percentage = 1.0 - self.full_current = True - self.standby = False - self.last_movement = now - if velocity: - frequency = abs(velocity) - sign0 = sign(velocity) - elif cmd_position is not None: - # To be replaced with the slope frequency calculation - frequency = self.max_frequency - sign0 = sign(cmd_position - self.current_position) - - new_position = None - if sign0: - steps_per_second = frequency * (128.0 / self.resolution) - delta = sign0 * int(round(steps_per_second * elapsed)) - new_position = self.current_position + delta - new_position = max(new_position, self.min_position) - new_position = min(new_position, self.max_position) - if new_position is not None: - if cmd_position is not None: - sign1 = sign(cmd_position - new_position) - if sign1 != sign0: - new_position = cmd_position - self.cmd_position = None - self.running = False - self.current_position = new_position - if not self.running and not self.standby: - if self.last_movement: - elapsed = now - self.last_movement - standby_delay = ( - self.standby_delay_multiplier * self.standby_delay_step - ) - if elapsed >= standby_delay: - self.current_percentage = 1.0 - self.standby_mode - if self.standby_mode > 0: - self.full_current = False - else: - self.full_current = True - self.last_movement = None - self.standby = True diff --git a/simulators/acu/__init__.py b/simulators/acu/__init__.py index 94cfc4c..7deb077 100644 --- a/simulators/acu/__init__.py +++ b/simulators/acu/__init__.py @@ -1,6 +1,6 @@ import time from datetime import datetime, timedelta, timezone -from threading import Thread, Event +from threading import Thread, Event, RLock from queue import Queue, Empty from socketserver import ThreadingTCPServer from simulators import utils @@ -99,8 +99,8 @@ def __init__(self, sampling_time=default_sampling_time): statuses.append(self.FS.status) self._update_status(self.status, statuses) - self.subscribe_q = Queue() - self.unsubscribe_q = Queue() + self.subscribers = [] + self.subscribers_lock = RLock() args = ( self.stop, @@ -111,8 +111,8 @@ def __init__(self, sampling_time=default_sampling_time): self.command_threads, self._update_subsystems, self._update_status, - self.subscribe_q, - self.unsubscribe_q + self.subscribers, + self.subscribers_lock ) self.update_thread = Thread( @@ -194,22 +194,11 @@ def _update_status(status, statuses): @staticmethod def _update_loop(stop, sampling_time, status, subsystems, statuses, cmd_queue, update_subsystems, update_status, - subscribe_q, unsubscribe_q): + subscribers, subscribers_lock): command_threads = [] nxt = None - counter = 0 - subscribers = [] + while not stop.is_set(): - try: - sub = subscribe_q.get_nowait() - subscribers.append(sub) - except Empty: - pass - try: - sub = unsubscribe_q.get_nowait() - subscribers.remove(sub) - except Empty: - pass try: command_threads.append(cmd_queue.get_nowait()) except Empty: @@ -221,29 +210,26 @@ def _update_loop(stop, sampling_time, status, subsystems, statuses, ) update_subsystems(subsystems) + update_status(status, statuses) - if counter % 20 == 0: - update_status(status, statuses) - for q in subscribers: - while True: - try: - q.get_nowait() - except Empty: - break - q.put(bytes(status)) - now = utils.bytes_to_real(status[721:729], precision=2) - now = utils.mjd_to_date(now) - counter = 0 - else: - now = datetime.now(timezone.utc) + with subscribers_lock: + current_subscribers = list(subscribers) - counter += 1 + for q in current_subscribers: + while True: + try: + q.get_nowait() + except Empty: + break + q.put(bytes(status)) + now = utils.bytes_to_real(status[721:729], precision=2) + now = utils.mjd_to_date(now) correction = 0 if nxt: correction = max(0, (now - nxt).total_seconds()) now = nxt - nxt = now + timedelta(seconds=sampling_time / 20.) + nxt = now + timedelta(seconds=sampling_time) sleep_for = (nxt - datetime.now(timezone.utc)).total_seconds() sleep_for -= correction @@ -253,10 +239,14 @@ def _update_loop(stop, sampling_time, status, subsystems, statuses, cmd_queue.put(command_thread) def subscribe(self, q): - self.subscribe_q.put(q) + with self.subscribers_lock: + if q not in self.subscribers: + self.subscribers.append(q) def unsubscribe(self, q): - self.unsubscribe_q.put(q) + with self.subscribers_lock: + if q in self.subscribers: + self.subscribers.remove(q) def _parse_commands(self, msg): cmds_number = utils.string_to_int(msg[12:16]) diff --git a/simulators/minor_servos/__init__.py b/simulators/minor_servos/__init__.py index d41585c..29d1295 100644 --- a/simulators/minor_servos/__init__.py +++ b/simulators/minor_servos/__init__.py @@ -103,16 +103,11 @@ def __init__(self, timer_value=DEFAULT_TIMER_VALUE, rest_api=True): 'DR_GFR3': Derotator('GFR3', self.stop), 'DR_PFP': Derotator('PFP', self.stop), } + self.cover_timer = threading.Timer(1, lambda: None) setup_import( list(self.servos.keys()) + ['GREGORIAN_CAP'], self.configurations ) - self.update_thread = threading.Thread( - target=self._update, - args=(self.stop, self.servos) - ) - self.update_thread.daemon = True - self.update_thread.start() self.rest_api = rest_api if self.rest_api: self.httpserver = HTTPServer( @@ -123,16 +118,12 @@ def __init__(self, timer_value=DEFAULT_TIMER_VALUE, rest_api=True): target=self.httpserver.serve_forever ) self.server_thread.start() - self.cover_timer = threading.Timer(1, lambda: None) - self._running = True def __del__(self): # skip coverage - if self._running: - self.system_stop() + self.system_stop() def system_stop(self): self.stop.set() - self.update_thread.join() if self.cover_timer: if self.cover_timer.is_alive(): self.cover_timer.cancel() @@ -145,17 +136,8 @@ def system_stop(self): self.httpserver.server_close() self.server_thread.join() retval = super().system_stop() - self._running = False return retval - @staticmethod - def _update(stop, servos): - while not stop.is_set(): - now = time.time() - for _, servo in servos.items(): - servo.get_status(now) - time.sleep(0.01) - def parse(self, byte): self.msg += byte if self.msg.endswith(self.tail): @@ -269,7 +251,7 @@ def _stow(self, args): servo.operative_mode_timer = threading.Timer( self.timer_value, _change_delayed_value, - args=(servo, "operative_mode", 20) # STOW + args=(servo, "_operative_mode", 20) # STOW ) servo.operative_mode_timer.daemon = True servo.operative_mode_timer.start() @@ -408,19 +390,21 @@ class Servo: def __init__(self, name, dof=1, stop=None): self.name = name + self._lock = threading.RLock() self.enabled = 1 self.status = 1 self.block = 2 - self.operative_mode = 0 + self._operative_mode = 0 self.future_oper_mode = 0 self.DOF = dof - self.coords = [0] * self.DOF - self.cmd_coords = self.coords.copy() + self._coords = [0] * self.DOF + self.cmd_coords = self._coords.copy() self.offsets = [0] * self.DOF - self.last_status_read = 0 + self._last_update_time = time.time() + self._update_threshold = 0.01 self.operative_mode_timer = threading.Timer(1, lambda: None) if self.program_track_capable: - self.trajectory_lock = threading.Lock() + self.trajectory_lock = threading.RLock() self.trajectory_id = None self.trajectory_start_time = None self.trajectory_point_id = None @@ -448,6 +432,89 @@ def __del__(self): except RuntimeError: pass + @property + def operative_mode(self): + self._update_state() + return self._operative_mode + + @operative_mode.setter + def operative_mode(self, value): + self._update_state() + self._operative_mode = value + + @property + def coords(self): + self._update_state() + return self._coords + + def _update_state(self, now=None): + if now is None: + now = time.time() + with self._lock: + elapsed = now - self._last_update_time + + if elapsed < self._update_threshold: + return + + self._last_update_time = now + + if self._operative_mode == 50: + pt_table = [] + with self.trajectory_lock: + pt_table = self.pt_table + if pt_table: + first_time = self.trajectory[0][0] + last_time = self.trajectory[0][-1] + if now >= first_time: + for index in range(self.DOF): + coord = splev(now, pt_table[index]).item(0) + if math.isnan(coord): # skip coverage + continue + coord = max(coord, self.min_coord[index]) + coord = min(coord, self.max_coord[index]) + delta = coord - self._coords[index] + direction = sign(delta) + delta = min( + self.max_delta[index] * elapsed, + abs(delta) + ) + self._coords[index] += direction * delta + if now > last_time: + with self.trajectory_lock: + self.trajectory_id = None + self.trajectory_start_time = None + self.trajectory_point_id = None + self.trajectory = [[] for _ in range(self.DOF + 1)] + self.pt_table = [] + elif self._operative_mode in [20, 30]: + self.cmd_coords = self._coords + elif self._coords != self.cmd_coords or self.future_oper_mode != 0: + coords = [] + for i in range(self.DOF): + if self._coords[i] == self.cmd_coords[i]: + coords.append(self.cmd_coords[i]) + continue + + diff = self.cmd_coords[i] - self._coords[i] + dist = abs(diff) + step = self.max_delta[i] * elapsed + coords.append( + self.cmd_coords[i] if step >= dist + else self._coords[i] + sign(diff) * step + ) + self._coords = coords + if self._coords == self.cmd_coords: + self._operative_mode = self.future_oper_mode + self.future_oper_mode = 0 + + def get_status(self, now): + self._update_state(now) + answer = f',{self.name}_ENABLED={self.enabled}|' + answer += f'{self.name}_STATUS={self.status}|' + answer += f'{self.name}_BLOCK={self.block}|' + answer += f'{self.name}_OPERATIVE_MODE={self._operative_mode}|' + return answer + def _program_track_worker(self, stop): while not stop.is_set(): try: @@ -490,61 +557,8 @@ def _program_track_worker(self, stop): )) self.pt_table = pt_table - def get_status(self, now): - elapsed = now - self.last_status_read - self.last_status_read = now - answer = f',{self.name}_ENABLED={self.enabled}|' - answer += f'{self.name}_STATUS={self.status}|' - answer += f'{self.name}_BLOCK={self.block}|' - answer += f'{self.name}_OPERATIVE_MODE={self.operative_mode}|' - if self.operative_mode == 50: - pt_table = [] - with self.trajectory_lock: - pt_table = self.pt_table - if pt_table: - first_time = self.trajectory[0][0] - last_time = self.trajectory[0][-1] - if now >= first_time: - for index in range(self.DOF): - coord = splev(now, pt_table[index]).item(0) - if math.isnan(coord): # skip coverage - continue - coord = max(coord, self.min_coord[index]) - coord = min(coord, self.max_coord[index]) - delta = coord - self.coords[index] - direction = sign(delta) - delta = min( - self.max_delta[index] * elapsed, abs(delta) - ) - self.coords[index] += direction * delta - if now > last_time: - with self.trajectory_lock: - self.trajectory_id = None - self.trajectory_start_time = None - self.trajectory_point_id = None - self.trajectory = [[] for _ in range(self.DOF + 1)] - self.pt_table = [] - elif self.operative_mode in [20, 30]: # STOW or STOP - self.cmd_coords = self.coords - elif self.coords != self.cmd_coords or self.future_oper_mode != 0: - # We commanded a preset and changed the commanded coords, move - delta = [a - b for a, b in zip(self.cmd_coords, self.coords)] - direction = sign(delta).tolist() - delta = [abs(d) for d in delta] - delta = [ - min(self.max_delta[i] * elapsed, delta[i]) - for i in range(self.DOF) - ] - coords = [ - a + b * c for a, b, c in zip(self.coords, direction, delta) - ] - self.coords = coords - if self.coords == self.cmd_coords: - self.operative_mode = self.future_oper_mode - self.future_oper_mode = 0 - return answer - def set_coords(self, coords, future_oper_mode, apply_offsets=True): + self._update_state(time.time()) for index, value in enumerate(coords): if value is None: coords[index] = self.cmd_coords[index] @@ -559,6 +573,7 @@ def set_coords(self, coords, future_oper_mode, apply_offsets=True): return True def set_offsets(self, coords): + self._update_state(time.time()) for index, value in enumerate(coords): self.offsets[index] = value @@ -589,9 +604,9 @@ def get_status(self, now): answer += f'PFP_ELONG_Z_SLAVE={random.uniform(-10, 10):.6f}|' answer += f'PFP_ELONG_THETA_MASTER={random.uniform(-10, 10):.6f}|' answer += f'PFP_ELONG_THETA_SLAVE={random.uniform(-10, 10):.6f}|' - answer += f'PFP_TX={self.coords[0]:.6f}|' - answer += f'PFP_TZ={self.coords[1]:.6f}|' - answer += f'PFP_RTHETA={self.coords[2]:.6f}|' + answer += f'PFP_TX={self._coords[0]:.6f}|' + answer += f'PFP_TZ={self._coords[1]:.6f}|' + answer += f'PFP_RTHETA={self._coords[2]:.6f}|' answer += f'PFP_OFFSET_TX={self.offsets[0]:.6f}|' answer += f'PFP_OFFSET_TZ={self.offsets[1]:.6f}|' answer += f'PFP_OFFSET_RTHETA={self.offsets[2]:.6f}' @@ -627,12 +642,12 @@ def get_status(self, now): answer += f'SRP_ELONG_Y1={random.uniform(-10, 10):.6f}|' answer += f'SRP_ELONG_Y2={random.uniform(-10, 10):.6f}|' answer += f'SRP_ELONG_X1={random.uniform(-10, 10):.6f}|' - answer += f'SRP_TX={self.coords[0]:.6f}|' - answer += f'SRP_TY={self.coords[1]:.6f}|' - answer += f'SRP_TZ={self.coords[2]:.6f}|' - answer += f'SRP_RX={self.coords[3]:.6f}|' - answer += f'SRP_RY={self.coords[4]:.6f}|' - answer += f'SRP_RZ={self.coords[5]:.6f}|' + answer += f'SRP_TX={self._coords[0]:.6f}|' + answer += f'SRP_TY={self._coords[1]:.6f}|' + answer += f'SRP_TZ={self._coords[2]:.6f}|' + answer += f'SRP_RX={self._coords[3]:.6f}|' + answer += f'SRP_RY={self._coords[4]:.6f}|' + answer += f'SRP_RZ={self._coords[5]:.6f}|' answer += f'SRP_OFFSET_TX={self.offsets[0]:.6f}|' answer += f'SRP_OFFSET_TY={self.offsets[1]:.6f}|' answer += f'SRP_OFFSET_TZ={self.offsets[2]:.6f}|' @@ -658,7 +673,7 @@ def get_status(self, now): answer += f'M3R_COUNTERCLOCKWISE_ENABLED={self.ccw_enabled}|' answer += f'M3R_CLOCKWISE={random.uniform(-2, 2):.6f}|' answer += f'M3R_COUNTERCLOCKWISE={random.uniform(-2, 2):.6f}|' - answer += f'M3R_ROTATION={self.coords[0]:.6f}|' + answer += f'M3R_ROTATION={self._coords[0]:.6f}|' answer += f'M3R_OFFSET={self.offsets[0]:.6f}' return answer @@ -679,7 +694,7 @@ def get_status(self, now): answer += f'GFR_COUNTERCLOCKWISE_ENABLED={self.ccw_enabled}|' answer += f'GFR_CLOCKWISE={random.uniform(-2, 2):.6f}|' answer += f'GFR_COUNTERCLOCKWISE={random.uniform(-2, 2):.6f}|' - answer += f'GFR_ROTATION={self.coords[0]:.6f}|' + answer += f'GFR_ROTATION={self._coords[0]:.6f}|' answer += f'GFR_OFFSET={self.offsets[0]:.6f}' return answer @@ -699,6 +714,6 @@ def get_status(self, now): answer = super().get_status(now) answer += f'{self.name}_ROTARY_AXIS_ENABLED=' answer += f'{self.rotary_axis_enabled}|' - answer += f'{self.name}_ROTATION={self.coords[0]:.6f}|' + answer += f'{self.name}_ROTATION={self._coords[0]:.6f}|' answer += f'{self.name}_OFFSET={self.offsets[0]:.6f}' return answer diff --git a/simulators/utils.py b/simulators/utils.py index cd69662..a43845d 100644 --- a/simulators/utils.py +++ b/simulators/utils.py @@ -883,12 +883,12 @@ def _mock_timer(self, interval, function, args=None, kwargs=None): kwargs or {} ) - def start(self): + def __enter__(self): self._patch_time.start() self._patch_sleep.start() self._patch_timer.start() - def stop(self): + def __exit__(self, *_): self._patch_time.stop() self._patch_sleep.stop() self._patch_timer.stop() diff --git a/tests/test_minor_servos.py b/tests/test_minor_servos.py index 6418db1..c034126 100644 --- a/tests/test_minor_servos.py +++ b/tests/test_minor_servos.py @@ -183,21 +183,19 @@ def test_teardown_before_timer_execution(self): # By closing here we test that the timers get canceled def test_setup(self): - fast_time = FastTimeMock(100) - fast_time.start() - configurations = self.system.configurations - for conf_name, configuration in configurations.items(): - cmd = f'SETUP={conf_name}{tail}' - for byte in cmd[:-1]: - self.assertTrue(self.system.parse(byte)) - self.assertRegex(self.system.parse(cmd[-1]), f'{good}{tail}$') - time.sleep(120) - for _, servo in self.system.servos.items(): - expected_coords = configuration[servo.name] - if any(coord is not None for coord in expected_coords): - # Should be in SETUP mode - self.assertEqual(servo.operative_mode, 10) - fast_time.stop() + with FastTimeMock(100): + configurations = self.system.configurations + for conf_name, configuration in configurations.items(): + cmd = f'SETUP={conf_name}{tail}' + for byte in cmd[:-1]: + self.assertTrue(self.system.parse(byte)) + self.assertRegex(self.system.parse(cmd[-1]), f'{good}{tail}$') + time.sleep(120) + for _, servo in self.system.servos.items(): + expected_coords = configuration[servo.name] + if any(coord is not None for coord in expected_coords): + # Should be in SETUP mode + self.assertEqual(servo.operative_mode, 10) def test_setup_no_wait(self): cmd = f'SETUP=Gregoriano1{tail}' @@ -219,44 +217,38 @@ def test_setup_unknown_configuration(self): self.assertRegex(self.system.parse(cmd[-1]), bad) def test_stow(self): - fast_time = FastTimeMock(100) - fast_time.start() - for servo_id, servo in self.system.servos.items(): - cmd = f'STOW={servo_id},1{tail}' + with FastTimeMock(100): + for servo_id, servo in self.system.servos.items(): + cmd = f'STOW={servo_id},1{tail}' + for byte in cmd[:-1]: + self.assertTrue(self.system.parse(byte)) + self.assertRegex(self.system.parse(cmd[-1]), f'{good}{tail}$') + time.sleep(2 * DEFAULT_TIMER_VALUE) + self.assertEqual(servo.operative_mode, 20) # STOW mode + + def test_stow_gregorian_cap(self): + with FastTimeMock(100): + for stow_pos in [1, 2]: + cmd = f'STOW=GREGORIAN_CAP,{stow_pos}{tail}' + for byte in cmd[:-1]: + self.assertTrue(self.system.parse(byte)) + self.assertRegex(self.system.parse(cmd[-1]), f'{good}{tail}$') + time.sleep(2 * DEFAULT_TIMER_VALUE) + self.assertEqual(self.system.gregorian_cap, stow_pos) + + def test_stow_gregorian_air_blade(self): + with FastTimeMock(100): + cmd = f'STOW=GREGORIAN_CAP,2{tail}' for byte in cmd[:-1]: self.assertTrue(self.system.parse(byte)) self.assertRegex(self.system.parse(cmd[-1]), f'{good}{tail}$') time.sleep(2 * DEFAULT_TIMER_VALUE) - self.assertEqual(servo.operative_mode, 20) # STOW mode - fast_time.stop() - - def test_stow_gregorian_cap(self): - fast_time = FastTimeMock(100) - fast_time.start() - for stow_pos in [1, 2]: - cmd = f'STOW=GREGORIAN_CAP,{stow_pos}{tail}' + self.assertEqual(self.system.gregorian_cap, 2) + cmd = f'STOW=GREGORIAN_CAP,3{tail}' for byte in cmd[:-1]: self.assertTrue(self.system.parse(byte)) self.assertRegex(self.system.parse(cmd[-1]), f'{good}{tail}$') - time.sleep(2 * DEFAULT_TIMER_VALUE) - self.assertEqual(self.system.gregorian_cap, stow_pos) - fast_time.stop() - - def test_stow_gregorian_air_blade(self): - fast_time = FastTimeMock(100) - fast_time.start() - cmd = f'STOW=GREGORIAN_CAP,2{tail}' - for byte in cmd[:-1]: - self.assertTrue(self.system.parse(byte)) - self.assertRegex(self.system.parse(cmd[-1]), f'{good}{tail}$') - time.sleep(2 * DEFAULT_TIMER_VALUE) - self.assertEqual(self.system.gregorian_cap, 2) - cmd = f'STOW=GREGORIAN_CAP,3{tail}' - for byte in cmd[:-1]: - self.assertTrue(self.system.parse(byte)) - self.assertRegex(self.system.parse(cmd[-1]), f'{good}{tail}$') - self.assertEqual(self.system.gregorian_cap, 3) - fast_time.stop() + self.assertEqual(self.system.gregorian_cap, 3) def test_stow_gregorian_cap_wrong_pos(self): cmd = f'STOW=GREGORIAN_CAP,5{tail}'