-
Notifications
You must be signed in to change notification settings - Fork 4
Feature/sequential homing #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8784578
4de619e
d4256e1
26690fa
fc46c43
e9a3819
8a6bd9e
661a42e
583ad63
a7a2ea4
e8305aa
51f1575
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,13 +9,15 @@ | |
| from being.can.nmt import OPERATIONAL, PRE_OPERATIONAL | ||
| from being.clock import Clock | ||
| from being.configuration import CONFIG | ||
| from being.constants import FORWARD, BACKWARD | ||
| from being.connectables import ValueOutput, MessageOutput | ||
| from being.execution import execute, block_network_graph | ||
| from being.graph import Graph, topological_sort | ||
| from being.logging import get_logger | ||
| from being.motion_player import MotionPlayer | ||
| from being.motors.blocks import MotorBlock | ||
| from being.motors.homing import HomingState | ||
| from being.motors.definitions import MotorEvent | ||
| from being.pacemaker import Pacemaker | ||
| from being.params import Parameter | ||
| from being.utils import filter_by_type | ||
|
|
@@ -62,6 +64,9 @@ def __init__(self, | |
| clock: Clock, | ||
| pacemaker: Pacemaker, | ||
| network: Optional[CanBackend] = None, | ||
| sequential_homing: bool = False, | ||
| pre_homing: bool = False, | ||
| pre_homing_direction: float = BACKWARD, | ||
| ): | ||
| """ | ||
| Args: | ||
|
|
@@ -105,6 +110,22 @@ def __init__(self, | |
| self.params: List[Parameter] = list(filter_by_type(self.execOrder, Parameter)) | ||
| """All parameter blocks.""" | ||
|
|
||
| self.sequential_homing: bool = sequential_homing | ||
| """One by one homing.""" | ||
|
|
||
| self.pre_homing: bool = pre_homing | ||
| """Moves motors to safe position before homing.""" | ||
|
|
||
| self.pre_homing_direction: float = pre_homing_direction | ||
| """Direction for safe homing.""" | ||
|
|
||
| self.motors_unhomed: Iterator = iter(self.motors) | ||
| """Iterator for sequential homing.""" | ||
|
|
||
| if sequential_homing: | ||
| for motor in self.motors: | ||
| motor.controller.subscribe(MotorEvent.HOMING_CHANGED, lambda: self.next_homing()) | ||
|
|
||
| def enable_motors(self): | ||
| """Enable all motor blocks.""" | ||
| self.logger.info('enable_motors()') | ||
|
|
@@ -120,8 +141,14 @@ def disable_motors(self): | |
| def home_motors(self): | ||
| """Home all motors.""" | ||
| self.logger.info('home_motors()') | ||
| for motor in self.motors: | ||
| motor.home() | ||
| if self.pre_homing: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interaction of pre-homing with homing is not totally clear, for instance here it looks like you either use pre-homing or sequential homing or parallel homing - whereas in fact pre_homing apparently will trigger homing internally. I'm wondering if there is a better place for all pre-homing-related as an extension of existing methods in homing.py. |
||
| for motor in self.motors: | ||
| motor.pre_home(self.pre_homing_direction) | ||
| elif self.sequential_homing: | ||
| next(self.motors_unhomed).home() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't you need to recreate the motors_unhomed iterator here? Homing can be called more than once per Being lifetime. |
||
| else: | ||
| for motor in self.motors: | ||
| motor.home() | ||
|
|
||
| def start_behaviors(self): | ||
| """Start all behaviors.""" | ||
|
|
@@ -133,6 +160,16 @@ def pause_behaviors(self): | |
| for behavior in self.behaviors: | ||
| behavior.pause() | ||
|
|
||
| def next_homing(self): | ||
| """Controls one by one homing.""" | ||
| if any(motor.homing_state() is HomingState.ONGOING for motor in self.motors): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought you may want to pass the motor instance (and probably the event type too) to this method in subscribe() like it's done in server.py - then you know directly which event you are reacting to and don't need to scan all motors. |
||
| return | ||
| else: | ||
| try: | ||
| next(self.motors_unhomed).home() | ||
| except StopIteration: | ||
| self.logger.debug('All motors are homed.') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This message may deserve level 'info'. |
||
|
|
||
| def single_cycle(self): | ||
| """Execute single being cycle. Network sync, executing block network, | ||
| advancing clock. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -204,10 +204,7 @@ def __init__(self, | |
| """Last receive state of motor controller.""" | ||
|
|
||
| # Configure node | ||
| self.apply_motor_direction(direction) | ||
| self.node.apply_settings(self.settings) | ||
| for errMsg in self.error_history_messages(): | ||
| self.logger.error(errMsg) | ||
| self.configure_node() | ||
|
|
||
| self.node.set_operation_mode(operationMode) | ||
|
|
||
|
|
@@ -267,7 +264,18 @@ def home(self): | |
| self.capture() | ||
| self.homing.home() | ||
|
|
||
| self.publish(MotorEvent.HOMING_CHANGED) | ||
| def pre_home(self, direction): | ||
| """Start pre-homing for this controller. Will start by the next call of | ||
| :meth:`Controller.update`. | ||
| """ | ||
| self.logger.debug('pre_home()') | ||
| if self.homing.ongoing: | ||
| self.homing.stop() | ||
| self.wasEnabled = False # Do not re-enable motor since not homed anymore | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't see a use of this variable anywhere. |
||
| self.restore() | ||
| else: | ||
| self.capture() | ||
| self.homing.pre_home(direction) | ||
|
|
||
| def homing_state(self) -> HomingState: | ||
| return self.homing.state | ||
|
|
@@ -293,6 +301,11 @@ def apply_motor_direction(self, direction: float): | |
| """Configure direction or orientation of controller / motor.""" | ||
| raise NotImplementedError | ||
|
|
||
| @abc.abstractmethod | ||
| def configure_node(self): | ||
| """Apply settings to the controller""" | ||
| raise NotImplementedError() | ||
|
|
||
| def format_emcy(self, emcy: EmcyError) -> str: | ||
| """Get vendor specific description of EMCY error.""" | ||
| return format_error_code(emcy.code, self.EMERGENCY_DESCRIPTIONS) | ||
|
|
@@ -431,7 +444,20 @@ def init_homing(self, **homingKwargs): | |
| else: | ||
| super().init_homing(homingMethod=method) | ||
|
|
||
| def configure_node(self): | ||
| self.apply_motor_direction(self.direction) | ||
| if self.direction < 0: | ||
| # Swap interpretation of min / max positions | ||
| minimum = self.settings['Software Position Limit/Minimum Position Limit'] | ||
| maximum = self.settings['Software Position Limit/Maximum Position Limit'] | ||
| self.settings['Software Position Limit/Maximum Position Limit'] = minimum | ||
| self.settings['Software Position Limit/Minimum Position Limit'] = maximum | ||
| self.node.apply_settings(self.settings) | ||
| for errMsg in self.error_history_messages(): | ||
| self.logger.error(errMsg) | ||
|
|
||
| def apply_motor_direction(self, direction: float): | ||
| self.logger.debug(f"Apply motor direction {direction}") | ||
| if direction >= 0: | ||
| positivePolarity = 0 | ||
| self.node.sdo['Polarity'].raw = positivePolarity | ||
|
|
@@ -506,6 +532,12 @@ def firmware_version(self) -> int: | |
| firmwareVersion = revisionNumber >> lowWord | ||
| return firmwareVersion | ||
|
|
||
| def configure_node(self): | ||
| self.apply_motor_direction(self.direction) | ||
| self.node.apply_settings(self.settings) | ||
| for errMsg in self.error_history_messages(): | ||
| self.logger.error(errMsg) | ||
|
|
||
| def apply_motor_direction(self, direction: float): | ||
| variable = self.node.sdo['Axis configuration']['Axis configuration miscellaneous'] | ||
| misc = variable.raw | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this direction be specified per motor?