From 7bc039bb0f2fc23432398d5a12b001b151dd7b12 Mon Sep 17 00:00:00 2001 From: AfromD <34109930+AfromD@users.noreply.github.com> Date: Wed, 19 Mar 2025 17:54:33 +0100 Subject: [PATCH 1/2] Added pacing for Bless calls --- pythoncode/bleBless.py | 898 +++++++++++++++++++++++++---------------- 1 file changed, 553 insertions(+), 345 deletions(-) diff --git a/pythoncode/bleBless.py b/pythoncode/bleBless.py index c7e237fe..0e9bc6d6 100644 --- a/pythoncode/bleBless.py +++ b/pythoncode/bleBless.py @@ -1,6 +1,6 @@ -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- # Description -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- # The bless library enables python programs to access Bluetooth Low Energy # as a server. This enables FortiusAnt to create a FTMS (FTMS FiTness Machine Server) # and communicate with a CTP (which is the client, refer bleBleak.py). @@ -8,12 +8,12 @@ # FortiusAnt uses class clsFTMS_bless, based upon clsBleServer # This file can be executed and then a simulator is started for demo/test purpose. # -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- # Author https://github.com/WouterJD # wouter.dubbeldam@xs4all.nl -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- # Version info -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- __version__ = "2022-08-10" # 2022-08-10 Steering implemented according marcoveeneman and switchable's code # 2022-04-12 TargetMode is initially None, so that FortiusAnt knowns that no @@ -33,51 +33,51 @@ # examples\gattserver.py # Example for a BLE 4.0 Server using a GATT dictionary of # characteristics -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- import struct import time from typing import Any, Dict from bless import ( - BlessServer, - BlessGATTCharacteristic, - GATTCharacteristicProperties, - GATTAttributePermissions - ) + BlessServer, + BlessGATTCharacteristic, + GATTCharacteristicProperties, + GATTAttributePermissions, +) if True: BlessExample = False - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # Import in the FortiusAnt context - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- import debug import logfile - from constants import mode_Power, mode_Grade, UseBluetooth - from logfile import HexSpace - from bleBlessClass import clsBleServer - import bleConstants as bc + from constants import mode_Power, mode_Grade, UseBluetooth + from logfile import HexSpace + from bleBlessClass import clsBleServer + import bleConstants as bc else: BlessExample = True - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # Import and Constants for bless example context - #--------------------------------------------------------------------------- - from FTMSserverClass import clsBleServer - import FTMSconstants as bc - from FTMSconstants import HexSpace + # --------------------------------------------------------------------------- + from FTMSserverClass import clsBleServer + import FTMSconstants as bc + from FTMSconstants import HexSpace import logging - mode_Power = 1 # Target Power - mode_Grade = 2 # Target Resistance - UseBluetooth = True + mode_Power = 1 # Target Power + mode_Grade = 2 # Target Resistance + UseBluetooth = True -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- # Define the server structure with services and characteristics # 2022-02-22 Note, see https://github.com/kevincar/bless/issues/67 # Value should be with a capital, but currently bless uses "value". # Therefore both Value/value are present, to avoid future issues. -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- """ Is not allowed to be added in the GATT definition: failed to create entry in database @@ -160,109 +160,139 @@ FitnessMachineGatt: Dict = { bc.sFitnessMachineUUID: { bc.cFitnessMachineFeatureUUID: { - "Properties": (GATTCharacteristicProperties.read), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": bc.fmf_Info, # b'\x02\x40\x00\x00\x08\x20\x00\x00', - "value": bc.fmf_Info, - "Description": bc.cFitnessMachineFeatureName + "Properties": (GATTCharacteristicProperties.read), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": bc.fmf_Info, # b'\x02\x40\x00\x00\x08\x20\x00\x00', + "value": bc.fmf_Info, + "Description": bc.cFitnessMachineFeatureName, }, bc.cIndoorBikeDataUUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": bc.ibd_Info, # Instantaneous Cadence, Power, HeartRate - "value": bc.ibd_Info, - "Description": bc.cIndoorBikeDataName + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": bc.ibd_Info, # Instantaneous Cadence, Power, HeartRate + "value": bc.ibd_Info, + "Description": bc.cIndoorBikeDataName, }, bc.cFitnessMachineStatusUUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "value": b'\x00\x00', # Status as "sent" to Cycling Training Program - "Value": b'\x00\x00', - "Description": bc.cFitnessMachineStatusName + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "value": b"\x00\x00", # Status as "sent" to Cycling Training Program + "Value": b"\x00\x00", + "Description": bc.cFitnessMachineStatusName, }, bc.cFitnessMachineControlPointUUID: { - "Properties": (GATTCharacteristicProperties.write | GATTCharacteristicProperties.indicate), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": b'\x00\x00', # Commands as received from Cycling Training Program - "value": b'\x00\x00', - "Description": bc.cFitnessMachineControlPointName + "Properties": ( + GATTCharacteristicProperties.write + | GATTCharacteristicProperties.indicate + ), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": b"\x00\x00", # Commands as received from Cycling Training Program + "value": b"\x00\x00", + "Description": bc.cFitnessMachineControlPointName, }, bc.cSupportedPowerRangeUUID: { - "Properties": (GATTCharacteristicProperties.read), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": bc.spr_Info, # Static additional properties of the FTMS - # b'\x00\x00\xe8\x03\x01\x00' - # min=0, max=1000, incr=1 - # ==> 0x0000 0x03e8 0x0001 ==> 0x0000 0xe803 0x0100 - "value": bc.spr_Info, - "Description": bc.cSupportedPowerRangeName - } + "Properties": (GATTCharacteristicProperties.read), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": bc.spr_Info, # Static additional properties of the FTMS + # b'\x00\x00\xe8\x03\x01\x00' + # min=0, max=1000, incr=1 + # ==> 0x0000 0x03e8 0x0001 ==> 0x0000 0xe803 0x0100 + "value": bc.spr_Info, + "Description": bc.cSupportedPowerRangeName, + }, }, bc.sHeartRateUUID: { bc.cHeartRateMeasurementUUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": bc.hrm_Info, - "value": bc.hrm_Info, - "Description": bc.cHeartRateMeasurementName + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": bc.hrm_Info, + "value": bc.hrm_Info, + "Description": bc.cHeartRateMeasurementName, } }, bc.sSteeringUUID: { bc.cSteeringUnknown1UUID: { - "Properties": (GATTCharacteristicProperties.write), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": b'\x00\x00', - "value": b'\x00\x00', - "Description": bc.cSteeringUnknown1Name + "Properties": (GATTCharacteristicProperties.write), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": b"\x00\x00", + "value": b"\x00\x00", + "Description": bc.cSteeringUnknown1Name, }, bc.cSteeringUnknown2UUID: { - "Properties": (GATTCharacteristicProperties.read), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": b'\xff', - "value": b'\xff', - "Description": bc.cSteeringUnknown2Name + "Properties": (GATTCharacteristicProperties.read), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": b"\xff", + "value": b"\xff", + "Description": bc.cSteeringUnknown2Name, }, bc.cSteeringUnknown3UUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": b'\x00', - "value": b'\x00', - "Description": bc.cSteeringUnknown3Name + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": b"\x00", + "value": b"\x00", + "Description": bc.cSteeringUnknown3Name, }, bc.cSteeringUnknown4UUID: { - "Properties": (GATTCharacteristicProperties.read), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": b'\xff', - "value": b'\xff', - "Description": bc.cSteeringUnknown4Name + "Properties": (GATTCharacteristicProperties.read), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": b"\xff", + "value": b"\xff", + "Description": bc.cSteeringUnknown4Name, }, bc.cSteeringAngleUUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": bc.angle_Info, - "value": bc.angle_Info, - "Description": bc.cSteeringAngleName + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": bc.angle_Info, + "value": bc.angle_Info, + "Description": bc.cSteeringAngleName, }, bc.cSteeringTxUUID: { - "Properties": (GATTCharacteristicProperties.indicate), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": b'\x00', - "value": b'\x00', - "Description": bc.cSteeringTxName + "Properties": (GATTCharacteristicProperties.indicate), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": b"\x00", + "value": b"\x00", + "Description": bc.cSteeringTxName, }, bc.cSteeringRxUUID: { - "Properties": (GATTCharacteristicProperties.write), - "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), - "Value": b'\x00', - "value": b'\x00', - "Description": bc.cSteeringRxName - } - } + "Properties": (GATTCharacteristicProperties.write), + "Permissions": ( + GATTAttributePermissions.readable | GATTAttributePermissions.writeable + ), + "Value": b"\x00", + "value": b"\x00", + "Description": bc.cSteeringRxName, + }, + }, } -#------------------------------------------------------------------------------- -# c l s F T M S _ b l e s s -#------------------------------------------------------------------------------- + +# ------------------------------------------------------------------------------- +# c l s F T M S _ b l e s s +# ------------------------------------------------------------------------------- # Class to create an FiTnessMachineServer, based upon bless # # User methods: @@ -276,41 +306,41 @@ # User attributes: # See parent class AND # See below -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- class clsFTMS_bless(clsBleServer): - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # CTP data, received through WriteRequest() and sent by client - #--------------------------------------------------------------------------- - TargetMode = None # No target received; and then: - # either mode_Power or mode_Grade - TargetGrade = 0 # % - TargetPower = 100 # Watt - - WindResistance = 0 - WindSpeed = 0 - DraftingFactor = 1 # Default since not supplied - RollingResistance = 0 - - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- + TargetMode = None # No target received; and then: + # either mode_Power or mode_Grade + TargetGrade = 0 # % + TargetPower = 100 # Watt + + WindResistance = 0 + WindSpeed = 0 + DraftingFactor = 1 # Default since not supplied + RollingResistance = 0 + + # --------------------------------------------------------------------------- # data provided by SetAthleteData() as called by application - #--------------------------------------------------------------------------- - HeartRate = 0 + # --------------------------------------------------------------------------- + HeartRate = 0 - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # data provided by SetTrainerData() as called by application - #--------------------------------------------------------------------------- - CurrentSpeed = 0 # km/hour - Cadence = 0 # /minute - CurrentPower = 0 # Watt - SteeringAngle = 0 # Steering is always present, - # regardless the -S command-line setting - # If no steering, value is zero. - - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- + CurrentSpeed = 0 # km/hour + Cadence = 0 # /minute + CurrentPower = 0 # Watt + SteeringAngle = 0 # Steering is always present, + # regardless the -S command-line setting + # If no steering, value is zero. + + # --------------------------------------------------------------------------- # Internal workflow control data - #--------------------------------------------------------------------------- - HasControl = False # CTP is controlling the FTMS - Started = False # A CTP training is started + # --------------------------------------------------------------------------- + HasControl = False # CTP is controlling the FTMS + Started = False # A CTP training is started # -------------------------------------------------------------------------- # _ _ i n i t _ _ @@ -325,8 +355,31 @@ class clsFTMS_bless(clsBleServer): def __init__(self, activate): if UseBluetooth and activate: super().__init__("FortiusAntTrainer", FitnessMachineGatt) + self.last_call_time = 0 + self.min_interval = 1 / 500 # 5000 times per second else: - pass # Data structure is created, no actions + pass # Data structure is created, no actions + + # -------------------------------------------------------------------------- + # Ratelimitedcall + # -------------------------------------------------------------------------- + # Input function parameters + # + # Function slows calling the bless library down + # + # Output returncode + # -------------------------------------------------------------------------- + + def rate_limited_call(self, func, *args, **kwargs): + current_time = time.time() + elapsed_time = current_time - self.last_call_time + + if elapsed_time < self.min_interval: + time.sleep(self.min_interval - elapsed_time) + + result = func(*args, **kwargs) + self.last_call_time = time.time() + return result # -------------------------------------------------------------------------- # SetAthleteData, SetTrainerData, SetSteeringAngle @@ -338,79 +391,108 @@ def __init__(self, activate): # Output OK = False # -------------------------------------------------------------------------- def SetAthleteData(self, HeartRate): - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Logging - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- self.logfileWrite("clsFTMS_bless.SetAthleteData(%s)" % HeartRate) - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Remember provided data - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- self.HeartRate = HeartRate - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Update heartrate in FTMS - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- if self.OK: flags = 0 - h = int(self.HeartRate) & 0xff # Avoid value anomalities - info = struct.pack (bc.little_endian + bc.unsigned_char * 2, flags, h) - self.BlessServer.get_characteristic(bc.cHeartRateMeasurementUUID).value = info - self.BlessServer.update_value(bc.sHeartRateUUID, bc.cHeartRateMeasurementUUID) + h = int(self.HeartRate) & 0xFF + info = struct.pack(bc.little_endian + bc.unsigned_char * 2, flags, h) + characteristic = self.BlessServer.get_characteristic( + bc.cHeartRateMeasurementUUID + ) + self.rate_limited_call(setattr, characteristic, "value", info) + self.rate_limited_call( + self.BlessServer.update_value, + bc.sHeartRateUUID, + bc.cHeartRateMeasurementUUID, + ) + else: - self.logfileConsole("clsFTMS_bless.SetAthleteData() error, interface not open") + self.logfileConsole( + "clsFTMS_bless.SetAthleteData() error, interface not open" + ) def SetTrainerData(self, CurrentSpeed, Cadence, CurrentPower): - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Logging - #----------------------------------------------------------------------- - self.logfileWrite("clsFTMS_bless.SetTrainerData(%s, %s, %s)" % (CurrentSpeed, Cadence, CurrentPower)) + # ----------------------------------------------------------------------- + self.logfileWrite( + "clsFTMS_bless.SetTrainerData(%s, %s, %s)" + % (CurrentSpeed, Cadence, CurrentPower) + ) - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Remember provided data - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- self.CurrentSpeed = CurrentSpeed - self.Cadence = Cadence + self.Cadence = Cadence self.CurrentPower = CurrentPower - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Update trainer status in FTMS # Note that: Speed always present UNLESS...) # HeartRate not transmitted, is not used. - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- if self.OK: - flags = (bc.ibd_InstantaneousCadencePresent | bc.ibd_InstantaneousPowerPresent) - s = int(self.CurrentSpeed * 100) & 0xffff # Avoid value anomalities - c = int(self.Cadence * 2) & 0xffff # Avoid value anomalities - p = int(self.CurrentPower) & 0xffff # Avoid value anomalities - info = struct.pack (bc.little_endian + bc.unsigned_short * 4, flags, s, c, p) - - self.BlessServer.get_characteristic(bc.cIndoorBikeDataUUID).value = info - self.BlessServer.update_value(bc.sFitnessMachineUUID, bc.cIndoorBikeDataUUID) + flags = ( + bc.ibd_InstantaneousCadencePresent | bc.ibd_InstantaneousPowerPresent + ) + s = int(self.CurrentSpeed * 100) & 0xFFFF + c = int(self.Cadence * 2) & 0xFFFF + p = int(self.CurrentPower) & 0xFFFF + info = struct.pack(bc.little_endian + bc.unsigned_short * 4, flags, s, c, p) + + characteristic = self.BlessServer.get_characteristic(bc.cIndoorBikeDataUUID) + self.rate_limited_call(setattr, characteristic, "value", info) + self.rate_limited_call( + self.BlessServer.update_value, + bc.sFitnessMachineUUID, + bc.cIndoorBikeDataUUID, + ) + else: - self.logfileConsole("clsFTMS_bless.SetTrainerData() error, interface not open") - + self.logfileConsole( + "clsFTMS_bless.SetTrainerData() error, interface not open" + ) + def SetSteeringAngle(self, SteeringAngle): - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Logging - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- self.logfileWrite("clsFTMS_bless.SetSteeringAngle(%s)" % SteeringAngle) - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Remember provided data - #----------------------------------------------------------------------- - self.SteeringAngle = SteeringAngle + # ----------------------------------------------------------------------- + self.SteeringAngle = SteeringAngle - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Update angle in steering - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- if self.OK: - a = SteeringAngle # Avoid value anomalities here (if needed) - info = struct.pack (bc.little_endian + bc.float, a) - self.BlessServer.get_characteristic(bc.cSteeringAngleUUID).value = info - self.BlessServer.update_value(bc.sSteeringUUID, bc.cSteeringAngleUUID) + a = SteeringAngle + info = struct.pack(bc.little_endian + bc.float, a) + characteristic = self.BlessServer.get_characteristic(bc.cSteeringAngleUUID) + self.rate_limited_call(setattr, characteristic, "value", info) + self.rate_limited_call( + self.BlessServer.update_value, bc.sSteeringUUID, bc.cSteeringAngleUUID + ) + else: - self.logfileConsole("clsFTMS_bless.SetSteeringAngle() error, interface not open") + self.logfileConsole( + "clsFTMS_bless.SetSteeringAngle() error, interface not open" + ) # -------------------------------------------------------------------------- # C l i e n t D i s c o n n e c t e d @@ -424,7 +506,7 @@ def SetSteeringAngle(self, SteeringAngle): # -------------------------------------------------------------------------- def ClientDisconnected(self): self.HasControl = False - self.Started = False + self.Started = False # -------------------------------------------------------------------------- # R e f r e s h @@ -442,202 +524,258 @@ def ClientDisconnected(self): def Refresh(self): return self.OK - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # l o g f i l e W r i t e / C o n s o l e / T r a c e b a c k - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # Input: text to be written to logfile # # Function Use our own standard logging functions # # Output: none - #--------------------------------------------------------------------------- - if not BlessExample: # As used in the FortiusAnt context + # --------------------------------------------------------------------------- + if not BlessExample: # As used in the FortiusAnt context + def logfileWrite(self, message): - if debug.on(debug.Ble): logfile.Write(message) + if debug.on(debug.Ble): + logfile.Write(message) def logfileConsole(self, message): logfile.Console(message) def logfileTraceback(self, exception): - if debug.on(debug.Ble): logfile.Traceback(exception) + if debug.on(debug.Ble): + logfile.Traceback(exception) - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # R e a d R e q u e s t - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # Input: characteristic for which the value is requested # # Function Return the requested value, without further processing # Replaces parent class because of enhanced logging # # Output: characteristic.value - #--------------------------------------------------------------------------- - def ReadRequest(self, - characteristic: BlessGATTCharacteristic, - **kwargs - ) -> bytearray: - - uuid = str(characteristic._uuid) - if uuid == bc.cFitnessMachineFeatureUUID: char = bc.cFitnessMachineFeatureName - elif uuid == bc.cIndoorBikeDataUUID: char = bc.cIndoorBikeDataName - elif uuid == bc.cFitnessMachineStatusUUID: char = bc.cFitnessMachineStatusName - elif uuid == bc.cFitnessMachineControlPointUUID: char = bc.cFitnessMachineControlPointName - elif uuid == bc.cSupportedPowerRangeUUID: char = bc.cSupportedPowerRangeName - elif uuid == bc.cHeartRateMeasurementUUID: char = bc.cHeartRateMeasurementName - elif uuid == bc.cSteeringAngleUUID: char = bc.cSteeringAngleName - else: char = "?" - - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- + def ReadRequest( + self, characteristic: BlessGATTCharacteristic, **kwargs + ) -> bytearray: + uuid = str(characteristic._uuid) + if uuid == bc.cFitnessMachineFeatureUUID: + char = bc.cFitnessMachineFeatureName + elif uuid == bc.cIndoorBikeDataUUID: + char = bc.cIndoorBikeDataName + elif uuid == bc.cFitnessMachineStatusUUID: + char = bc.cFitnessMachineStatusName + elif uuid == bc.cFitnessMachineControlPointUUID: + char = bc.cFitnessMachineControlPointName + elif uuid == bc.cSupportedPowerRangeUUID: + char = bc.cSupportedPowerRangeName + elif uuid == bc.cHeartRateMeasurementUUID: + char = bc.cHeartRateMeasurementName + elif uuid == bc.cSteeringAngleUUID: + char = bc.cSteeringAngleName + else: + char = "?" + + # --------------------------------------------------------------------------- # Logging - #--------------------------------------------------------------------------- - self.logfileWrite('clsFTMS_bless.ReadRequest(): characteristic "%s", value = %s' % - (char, HexSpace(characteristic._value))) + # --------------------------------------------------------------------------- + self.logfileWrite( + 'clsFTMS_bless.ReadRequest(): characteristic "%s", value = %s' + % (char, HexSpace(characteristic._value)) + ) return characteristic.value - #------------------------------------------------------------------------------- + # ------------------------------------------------------------------------------- # W r i t e R e q u e s t - #------------------------------------------------------------------------------- + # ------------------------------------------------------------------------------- # Input: characteristic for which the value must be updated # # Function Process the request # # Output: characteristic values modified according request # HasControl, Started - #------------------------------------------------------------------------------- - def WriteRequest(self, - characteristic: BlessGATTCharacteristic, - pvalue: Any, - **kwargs - ): - - value = bytes(pvalue) # at least for struct.unpack() - - uuid = str(characteristic._uuid) - if uuid == bc.cFitnessMachineFeatureUUID: char = bc.cFitnessMachineFeatureName - elif uuid == bc.cIndoorBikeDataUUID: char = bc.cIndoorBikeDataName - elif uuid == bc.cFitnessMachineStatusUUID: char = bc.cFitnessMachineStatusName - elif uuid == bc.cFitnessMachineControlPointUUID: char = bc.cFitnessMachineControlPointName - elif uuid == bc.cSupportedPowerRangeUUID: char = bc.cSupportedPowerRangeName - elif uuid == bc.cHeartRateMeasurementUUID: char = bc.cHeartRateMeasurementUUID - elif uuid == bc.cSteeringAngleUUID: char = bc.cSteeringAngleName - else: char = "?" - - #--------------------------------------------------------------------------- + # ------------------------------------------------------------------------------- + def WriteRequest( + self, characteristic: BlessGATTCharacteristic, pvalue: Any, **kwargs + ): + value = bytes(pvalue) # at least for struct.unpack() + + uuid = str(characteristic._uuid) + if uuid == bc.cFitnessMachineFeatureUUID: + char = bc.cFitnessMachineFeatureName + elif uuid == bc.cIndoorBikeDataUUID: + char = bc.cIndoorBikeDataName + elif uuid == bc.cFitnessMachineStatusUUID: + char = bc.cFitnessMachineStatusName + elif uuid == bc.cFitnessMachineControlPointUUID: + char = bc.cFitnessMachineControlPointName + elif uuid == bc.cSupportedPowerRangeUUID: + char = bc.cSupportedPowerRangeName + elif uuid == bc.cHeartRateMeasurementUUID: + char = bc.cHeartRateMeasurementUUID + elif uuid == bc.cSteeringAngleUUID: + char = bc.cSteeringAngleName + else: + char = "?" + + # --------------------------------------------------------------------------- # Logging - #--------------------------------------------------------------------------- - self.logfileWrite('bleBless: Write request for characteristic "%s", actual value = %s, provided value = %s' % - (char, HexSpace(characteristic.value), HexSpace(value))) + # --------------------------------------------------------------------------- + self.logfileWrite( + 'bleBless: Write request for characteristic "%s", actual value = %s, provided value = %s' + % (char, HexSpace(characteristic.value), HexSpace(value)) + ) - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # MachineControlPoint modifies behaviour; the only write we expect - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- if not uuid == bc.cFitnessMachineControlPointUUID: - self.logfileConsole('bleBless error: Write request on "%s" characteristic is not supported; ignored.' % char) + self.logfileConsole( + 'bleBless error: Write request on "%s" characteristic is not supported; ignored.' + % char + ) else: - OpCode = int(value[0]) # The operation to be performed + OpCode = int(value[0]) # The operation to be performed ResultCode = bc.fmcp_Success # Let's assume it will be OK - UseWorkflow = True # A CTP must request control to be able to - # send further requests and Start before - # changing TargetPower/Grade - # If UseWorkflow == False, these checks - # are disabled. - # - # Funny things is, that HasControl suggests - # a check, but if the CTP proceeds regard- - # less, the Request would be accepted - # Unless there we would know what CTP has - # been granted access... - # - # Now that _FortiusAntServer() detects - # a disconnect, the workflow can be enabled. - #----------------------------------------------------------------------- + UseWorkflow = True # A CTP must request control to be able to + # send further requests and Start before + # changing TargetPower/Grade + # If UseWorkflow == False, these checks + # are disabled. + # + # Funny things is, that HasControl suggests + # a check, but if the CTP proceeds regard- + # less, the Request would be accepted + # Unless there we would know what CTP has + # been granted access... + # + # Now that _FortiusAntServer() detects + # a disconnect, the workflow can be enabled. + # ----------------------------------------------------------------------- # React on requested operation # - check workflow # - accept values and/or modify internal state (HasControl, Started) # - notify client that value is changed (FitnessMachineStatus) # - notify that operation is completed (ResultCode) - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- if OpCode == bc.fmcp_RequestControl: if not UseWorkflow or not self.HasControl: self.HasControl = True - self.Started = False + self.Started = False self.logfileWrite("bleBless: HasControl = True") - self.Message = ", Bluetooth interface controlled" + self.Message = ", Bluetooth interface controlled" else: ResultCode = bc.fmcp_ControlNotPermitted - self.logfileConsole("bleBless error: control requested by client, control already granted") + self.logfileConsole( + "bleBless error: control requested by client, control already granted" + ) elif UseWorkflow and not self.HasControl: ResultCode = bc.fmcp_ControlNotPermitted - self.logfileConsole("bleBless error: request received, but client has no control") + self.logfileConsole( + "bleBless error: request received, but client has no control" + ) else: if OpCode == bc.fmcp_StartOrResume: self.Started = True self.logfileWrite("bleBless: Started = True") - self.notifyStartOrResume() # Confirm receipt to client - self.Message = ", Bluetooth interface training started" + self.notifyStartOrResume() # Confirm receipt to client + self.Message = ", Bluetooth interface training started" elif OpCode == bc.fmcp_SetTargetPower: try: - tuple = struct.unpack (bc.little_endian + bc.unsigned_char + bc.unsigned_short, value) + tuple = struct.unpack( + bc.little_endian + bc.unsigned_char + bc.unsigned_short, + value, + ) except Exception as e: - self.logfileConsole("bleBless error: unpack SetTargetPower %e" % e) - #opcode = tuple[0] + self.logfileConsole( + "bleBless error: unpack SetTargetPower %e" % e + ) + # opcode = tuple[0] self.TargetPower = tuple[1] self.TargetGrade = 0 - self.TargetMode = mode_Power + self.TargetMode = mode_Power self.logfileWrite("bleBless: TargetPower = %s" % self.TargetPower) - self.notifySetTargetPower() # Confirm receipt to client - self.Message = ", Bluetooth interface in power mode" + self.notifySetTargetPower() # Confirm receipt to client + self.Message = ", Bluetooth interface in power mode" elif OpCode == bc.fmcp_SetIndoorBikeSimulation: try: - tuple = struct.unpack (bc.little_endian + bc.unsigned_char + bc.short + bc.short + bc.unsigned_char + bc.unsigned_char, value) + tuple = struct.unpack( + bc.little_endian + + bc.unsigned_char + + bc.short + + bc.short + + bc.unsigned_char + + bc.unsigned_char, + value, + ) except Exception as e: - self.logfileConsole("bleBless error: unpack SetIndoorBikeSimulation %s" % e) - #opcode = tuple[0] - self.WindSpeed = round(tuple[1] * 0.001, 3) - self.TargetGrade = round(tuple[2] * 0.01, 2) - self.TargetPower = 0 - self.TargetMode = mode_Grade + self.logfileConsole( + "bleBless error: unpack SetIndoorBikeSimulation %s" % e + ) + # opcode = tuple[0] + self.WindSpeed = round(tuple[1] * 0.001, 3) + self.TargetGrade = round(tuple[2] * 0.01, 2) + self.TargetPower = 0 + self.TargetMode = mode_Grade self.RollingResistance = round(tuple[3] * 0.0001, 4) - self.WindResistance = round(tuple[4] * 0.01, 2) + self.WindResistance = round(tuple[4] * 0.01, 2) self.logfileWrite( - "bleBless: windspeed=%s, TargetGrade=%s, RollingResistance=%s, WindResistance=%s" % - (self.WindSpeed, self.TargetGrade, self.RollingResistance, self.WindResistance)) - self.notifySetIndoorBikeSimulation() # Confirm receipt to client - self.Message = ", Bluetooth interface in grade mode" + "bleBless: windspeed=%s, TargetGrade=%s, RollingResistance=%s, WindResistance=%s" + % ( + self.WindSpeed, + self.TargetGrade, + self.RollingResistance, + self.WindResistance, + ) + ) + self.notifySetIndoorBikeSimulation() # Confirm receipt to client + self.Message = ", Bluetooth interface in grade mode" elif OpCode == bc.fmcp_StopOrPause: self.Started = False self.logfileWrite("bleBless: Started = False") - self.notifyStopOrPause() # Confirm receipt to client - self.Message = ", Bluetooth interface training stopped" + self.notifyStopOrPause() # Confirm receipt to client + self.Message = ", Bluetooth interface training stopped" elif OpCode == bc.fmcp_Reset: - self.Started = False + self.Started = False self.HasControl = False self.logfileWrite("bleBless: HasControl = False") - self.notifyReset() # Confirm receipt to client - self.Message = ", Bluetooth interface open" + self.notifyReset() # Confirm receipt to client + self.Message = ", Bluetooth interface open" else: self.logfileConsole("bleBless error: Unknown OpCode %s" % OpCode) ResultCode = bc.fmcp_ControlNotPermitted - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Response: - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- ResponseCode = 0x80 - info = struct.pack(bc.little_endian + bc.unsigned_char * 3, ResponseCode, OpCode, ResultCode) + info = struct.pack( + bc.little_endian + bc.unsigned_char * 3, + ResponseCode, + OpCode, + ResultCode, + ) characteristic.value = info - self.BlessServer.update_value(bc.sFitnessMachineUUID, bc.cFitnessMachineControlPointUUID) + self.BlessServer.update_value( + bc.sFitnessMachineUUID, bc.cFitnessMachineControlPointUUID + ) if False: - self.logfileWrite("bleBless: New value for characteristic %s = %s" % (char, HexSpace(info))) + self.logfileWrite( + "bleBless: New value for characteristic %s = %s" + % (char, HexSpace(info)) + ) # -------------------------------------------------------------------------- # After that a characteristic is written, it must also be confirmed through @@ -645,40 +783,89 @@ def WriteRequest(self, # -------------------------------------------------------------------------- def notifyStartOrResume(self): self.logfileWrite("bleBless.notifyStartOrResume()") - info = struct.pack(bc.little_endian + bc.unsigned_char, bc.fms_FitnessMachineStartedOrResumedByUser) - self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID).value = info - self.BlessServer.update_value(bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID) + info = struct.pack( + bc.little_endian + bc.unsigned_char, + bc.fms_FitnessMachineStartedOrResumedByUser, + ) + characteristic = self.BlessServer.get_characteristic( + bc.cFitnessMachineStatusUUID + ) + self.rate_limited_call(setattr, characteristic, "value", info) + self.rate_limited_call( + self.BlessServer.update_value, + bc.sFitnessMachineUUID, + bc.cFitnessMachineStatusUUID, + ) def notifySetTargetPower(self): self.logfileWrite("bleBless.notifySetTargetPower()") - info = struct.pack(bc.little_endian + bc.unsigned_char + bc.unsigned_short, bc.fms_TargetPowerChanged, self.TargetPower) - self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID).value = info - self.BlessServer.update_value(bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID) + info = struct.pack( + bc.little_endian + bc.unsigned_char + bc.unsigned_short, + bc.fms_TargetPowerChanged, + self.TargetPower, + ) + characteristic = self.BlessServer.get_characteristic( + bc.cFitnessMachineStatusUUID + ) + self.rate_limited_call(setattr, characteristic, "value", info) + self.rate_limited_call( + self.BlessServer.update_value, + bc.sFitnessMachineUUID, + bc.cFitnessMachineStatusUUID, + ) def notifySetIndoorBikeSimulation(self): self.logfileWrite("bleBless.notifySetIndoorBikeSimulation()") - - windSpeed = int(self.WindSpeed / 0.001 ) - grade = int(self.TargetGrade / 0.01 ) - crr = int(self.RollingResistance / 0.0001) - cw = int(self.WindResistance / 0.01 ) - - info = struct.pack(bc.little_endian + bc.unsigned_char + bc.short * 2 + bc.unsigned_char * 2, - bc.fms_IndoorBikeSimulationParametersChanged, windSpeed, grade, crr, cw) - self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID).value = info - self.BlessServer.update_value(bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID) + windSpeed = int(self.WindSpeed / 0.001) + grade = int(self.TargetGrade / 0.01) + crr = int(self.RollingResistance / 0.0001) + cw = int(self.WindResistance / 0.01) + info = struct.pack( + bc.little_endian + bc.unsigned_char + bc.short * 2 + bc.unsigned_char * 2, + bc.fms_IndoorBikeSimulationParametersChanged, + windSpeed, + grade, + crr, + cw, + ) + characteristic = self.BlessServer.get_characteristic( + bc.cFitnessMachineStatusUUID + ) + self.rate_limited_call(setattr, characteristic, "value", info) + self.rate_limited_call( + self.BlessServer.update_value, + bc.sFitnessMachineUUID, + bc.cFitnessMachineStatusUUID, + ) def notifyStopOrPause(self): self.logfileWrite("bleBless.notifyStopOrPause()") - info = struct.pack(bc.little_endian + bc.unsigned_char, bc.fms_FitnessMachineStoppedOrPausedByUser) - self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID).value = info - self.BlessServer.update_value(bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID) + info = struct.pack( + bc.little_endian + bc.unsigned_char, + bc.fms_FitnessMachineStoppedOrPausedByUser, + ) + characteristic = self.BlessServer.get_characteristic( + bc.cFitnessMachineStatusUUID + ) + self.rate_limited_call(setattr, characteristic, "value", info) + self.rate_limited_call( + self.BlessServer.update_value, + bc.sFitnessMachineUUID, + bc.cFitnessMachineStatusUUID, + ) def notifyReset(self): self.logfileWrite("bleBless.notifyReset()") info = struct.pack(bc.little_endian + bc.unsigned_char, bc.fms_Reset) - self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID).value = info - self.BlessServer.update_value(bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID) + characteristic = self.BlessServer.get_characteristic( + bc.cFitnessMachineStatusUUID + ) + self.rate_limited_call(setattr, characteristic, "value", info) + self.rate_limited_call( + self.BlessServer.update_value, + bc.sFitnessMachineUUID, + bc.cFitnessMachineStatusUUID, + ) # ------------------------------------------------------------------------------ # S i m u l a t o r @@ -692,27 +879,33 @@ def notifyReset(self): # SetAthleteData() and/or SetTrainerData() # Refresh() # use class-attributes (TargetMode, TargetPower, ...) - # + # # # Output The interface is used, no further output. # ------------------------------------------------------------------------------ def Simulator(self): - self.logfileConsole("---------------------------------------------------------------------------------------") + self.logfileConsole( + "---------------------------------------------------------------------------------------" + ) self.logfileConsole("FortiusAnt simulated trainer is active") - self.logfileConsole("Start a training in a CTP; 5 seconds after completing the training, simulation will end") - self.logfileConsole("---------------------------------------------------------------------------------------") + self.logfileConsole( + "Start a training in a CTP; 5 seconds after completing the training, simulation will end" + ) + self.logfileConsole( + "---------------------------------------------------------------------------------------" + ) - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # FortiusAntServer is now active # read/write through read_Request() and write_Request() functions - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- ClientWasConnected = False i = 5 while i: - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Stop 5 seconds after a client is disconnected # In the meantime, Start, Power/Grade, Stop is expected.... - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- if self.ClientConnected: ClientWasConnected = True @@ -720,39 +913,39 @@ def Simulator(self): i -= 1 pass - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Update trainer info every second - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- time.sleep(1) - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Create some fancy data - #----------------------------------------------------------------------- - s = int(time.time() % 60) # Seconds - HeartRate = int(000 + s) - Cadence = int(100 + s ) - CurrentSpeed = round(200 + s,1) - CurrentPower = int(300 + s) - SteeringAngle= s - 30 # -30 ... 30 - - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- + s = int(time.time() % 60) # Seconds + HeartRate = int(000 + s) + Cadence = int(100 + s) + CurrentSpeed = round(200 + s, 1) + CurrentPower = int(300 + s) + SteeringAngle = s - 30 # -30 ... 30 + + # ----------------------------------------------------------------------- # Update actual values of Athlete and Trainer and Steering - #----------------------------------------------------------------------- - self.SetAthleteData (HeartRate) - self.SetTrainerData (CurrentSpeed, Cadence, CurrentPower) + # ----------------------------------------------------------------------- + self.SetAthleteData(HeartRate) + self.SetTrainerData(CurrentSpeed, Cadence, CurrentPower) self.SetSteeringAngle(SteeringAngle) - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Allow class to take care that attributes are accurate - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- self.Refresh() - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # Here the real application can take action, usually adjust the # resistance of the bicycle and/or display the target Mode/Power/Grade. - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- # All we do is show current status / target # --> Four variables are filled by ourselves, which in reality comes from # the athlete's HeartRateMonitor and the fitness bike (see above). @@ -761,37 +954,52 @@ def Simulator(self): # # --> TargetMode, TargetPower, TargetGrade are provided by the Cycling # Trining Program (CTP), the client to this fitness machine. - #----------------------------------------------------------------------- + # ----------------------------------------------------------------------- print( - "Client=%-5s HasControl=%-5s Started=%-5s TargetPower=%4s TargetGrade=%-6s Speed=%3s Cadence=%3s, Power=%3s, HeartRate=%3s Angle=%5s" % - (self.ClientConnected, self.HasControl, self.Started, self.TargetPower, self.TargetGrade, self.CurrentSpeed, self.Cadence, self.CurrentPower, self.HeartRate, self.SteeringAngle)) + "Client=%-5s HasControl=%-5s Started=%-5s TargetPower=%4s TargetGrade=%-6s Speed=%3s Cadence=%3s, Power=%3s, HeartRate=%3s Angle=%5s" + % ( + self.ClientConnected, + self.HasControl, + self.Started, + self.TargetPower, + self.TargetGrade, + self.CurrentSpeed, + self.Cadence, + self.CurrentPower, + self.HeartRate, + self.SteeringAngle, + ) + ) self.logfileConsole("FortiusAnt simulated trainer is stopped") self.logfileConsole("---------------------------------------") + # ============================================================================== # Main program # ============================================================================== if __name__ == "__main__": - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # Initialization - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- if not BlessExample: debug.activate(debug.Ble | debug.logging_DEBUG) logfile.Open() - print('FTMS server in FortiusAnt context') + print("FTMS server in FortiusAnt context") else: - logging.basicConfig(level=logging.DEBUG) # pylint:disable=invalid-name,used-before-assignment,undefined-variable + logging.basicConfig( + level=logging.DEBUG + ) # pylint:disable=invalid-name,used-before-assignment,undefined-variable logger = logging.getLogger(name=__name__) - print('FTMS server in bless/example context') + print("FTMS server in bless/example context") print("bleBless started") print("----------------") - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # This is what it's all about - #--------------------------------------------------------------------------- - FortiusAntServer = clsFTMS_bless(True) # clv.ble + # --------------------------------------------------------------------------- + FortiusAntServer = clsFTMS_bless(True) # clv.ble print("Message=%s" % FortiusAntServer.Message) b = FortiusAntServer.Open() @@ -802,9 +1010,9 @@ def Simulator(self): FortiusAntServer.Close() print("Message=%s" % FortiusAntServer.Message) - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- # Termination - #--------------------------------------------------------------------------- + # --------------------------------------------------------------------------- print("bleBless ended") """ @@ -823,7 +1031,7 @@ def Simulator(self): Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=201 Cadence=101, Power=301, HeartRate= 1 Angle= -29 Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=202 Cadence=102, Power=302, HeartRate= 2 Angle= -28 Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=203 Cadence=103, Power=303, HeartRate= 3 Angle= -27 -... + Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=258 Cadence=158, Power=358, HeartRate= 58 Angle= 28 Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=259 Cadence=159, Power=359, HeartRate= 59 Angle= 29 Client=True HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=200 Cadence=100, Power=300, HeartRate= 0 Angle= -30 @@ -915,7 +1123,7 @@ def Simulator(self): Characteristic 347b0030-7635-408b-8918-8ff3949ce592 changed | value: 00 00 E8 41 Characteristic 347b0030-7635-408b-8918-8ff3949ce592 changed | value: 00 00 F0 C1 Characteristic 347b0030-7635-408b-8918-8ff3949ce592 changed | value: 00 00 E8 C1 -... + Disconnecting from 4F:69:66:13:7A:2A (LT-ENTERTAIN) Disconnected from 4F:69:66:13:7A:2A (LT-ENTERTAIN) @@ -968,4 +1176,4 @@ def Simulator(self): |--347b0032-7635-408b-8918-8ff3949ce592: Indicate == cSteeringTxUUID |------00002902-0000-1000-8000-00805f9b34fb: -""" \ No newline at end of file +""" From a1cf5147df3f662f55c2f11950d946d9bdba7a05 Mon Sep 17 00:00:00 2001 From: AfromD <34109930+AfromD@users.noreply.github.com> Date: Wed, 19 Mar 2025 18:17:18 +0100 Subject: [PATCH 2/2] Bless call pacing, adjusted formatting --- pythoncode/bleBless.py | 861 ++++++++++++++++------------------------- 1 file changed, 342 insertions(+), 519 deletions(-) diff --git a/pythoncode/bleBless.py b/pythoncode/bleBless.py index 0e9bc6d6..0a2f4ff0 100644 --- a/pythoncode/bleBless.py +++ b/pythoncode/bleBless.py @@ -1,6 +1,6 @@ -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- # Description -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- # The bless library enables python programs to access Bluetooth Low Energy # as a server. This enables FortiusAnt to create a FTMS (FTMS FiTness Machine Server) # and communicate with a CTP (which is the client, refer bleBleak.py). @@ -8,12 +8,12 @@ # FortiusAnt uses class clsFTMS_bless, based upon clsBleServer # This file can be executed and then a simulator is started for demo/test purpose. # -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- # Author https://github.com/WouterJD # wouter.dubbeldam@xs4all.nl -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- # Version info -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- __version__ = "2022-08-10" # 2022-08-10 Steering implemented according marcoveeneman and switchable's code # 2022-04-12 TargetMode is initially None, so that FortiusAnt knowns that no @@ -33,51 +33,51 @@ # examples\gattserver.py # Example for a BLE 4.0 Server using a GATT dictionary of # characteristics -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- import struct import time from typing import Any, Dict from bless import ( - BlessServer, - BlessGATTCharacteristic, - GATTCharacteristicProperties, - GATTAttributePermissions, -) + BlessServer, + BlessGATTCharacteristic, + GATTCharacteristicProperties, + GATTAttributePermissions + ) if True: BlessExample = False - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # Import in the FortiusAnt context - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- import debug import logfile - from constants import mode_Power, mode_Grade, UseBluetooth - from logfile import HexSpace - from bleBlessClass import clsBleServer - import bleConstants as bc + from constants import mode_Power, mode_Grade, UseBluetooth + from logfile import HexSpace + from bleBlessClass import clsBleServer + import bleConstants as bc else: BlessExample = True - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # Import and Constants for bless example context - # --------------------------------------------------------------------------- - from FTMSserverClass import clsBleServer - import FTMSconstants as bc - from FTMSconstants import HexSpace + #--------------------------------------------------------------------------- + from FTMSserverClass import clsBleServer + import FTMSconstants as bc + from FTMSconstants import HexSpace import logging - mode_Power = 1 # Target Power - mode_Grade = 2 # Target Resistance - UseBluetooth = True + mode_Power = 1 # Target Power + mode_Grade = 2 # Target Resistance + UseBluetooth = True -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- # Define the server structure with services and characteristics # 2022-02-22 Note, see https://github.com/kevincar/bless/issues/67 # Value should be with a capital, but currently bless uses "value". # Therefore both Value/value are present, to avoid future issues. -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- """ Is not allowed to be added in the GATT definition: failed to create entry in database @@ -160,139 +160,109 @@ FitnessMachineGatt: Dict = { bc.sFitnessMachineUUID: { bc.cFitnessMachineFeatureUUID: { - "Properties": (GATTCharacteristicProperties.read), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": bc.fmf_Info, # b'\x02\x40\x00\x00\x08\x20\x00\x00', - "value": bc.fmf_Info, - "Description": bc.cFitnessMachineFeatureName, + "Properties": (GATTCharacteristicProperties.read), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": bc.fmf_Info, # b'\x02\x40\x00\x00\x08\x20\x00\x00', + "value": bc.fmf_Info, + "Description": bc.cFitnessMachineFeatureName }, bc.cIndoorBikeDataUUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": bc.ibd_Info, # Instantaneous Cadence, Power, HeartRate - "value": bc.ibd_Info, - "Description": bc.cIndoorBikeDataName, + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": bc.ibd_Info, # Instantaneous Cadence, Power, HeartRate + "value": bc.ibd_Info, + "Description": bc.cIndoorBikeDataName }, bc.cFitnessMachineStatusUUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "value": b"\x00\x00", # Status as "sent" to Cycling Training Program - "Value": b"\x00\x00", - "Description": bc.cFitnessMachineStatusName, + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "value": b'\x00\x00', # Status as "sent" to Cycling Training Program + "Value": b'\x00\x00', + "Description": bc.cFitnessMachineStatusName }, bc.cFitnessMachineControlPointUUID: { - "Properties": ( - GATTCharacteristicProperties.write - | GATTCharacteristicProperties.indicate - ), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": b"\x00\x00", # Commands as received from Cycling Training Program - "value": b"\x00\x00", - "Description": bc.cFitnessMachineControlPointName, + "Properties": (GATTCharacteristicProperties.write | GATTCharacteristicProperties.indicate), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": b'\x00\x00', # Commands as received from Cycling Training Program + "value": b'\x00\x00', + "Description": bc.cFitnessMachineControlPointName }, bc.cSupportedPowerRangeUUID: { - "Properties": (GATTCharacteristicProperties.read), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": bc.spr_Info, # Static additional properties of the FTMS - # b'\x00\x00\xe8\x03\x01\x00' - # min=0, max=1000, incr=1 - # ==> 0x0000 0x03e8 0x0001 ==> 0x0000 0xe803 0x0100 - "value": bc.spr_Info, - "Description": bc.cSupportedPowerRangeName, - }, + "Properties": (GATTCharacteristicProperties.read), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": bc.spr_Info, # Static additional properties of the FTMS + # b'\x00\x00\xe8\x03\x01\x00' + # min=0, max=1000, incr=1 + # ==> 0x0000 0x03e8 0x0001 ==> 0x0000 0xe803 0x0100 + "value": bc.spr_Info, + "Description": bc.cSupportedPowerRangeName + } }, bc.sHeartRateUUID: { bc.cHeartRateMeasurementUUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": bc.hrm_Info, - "value": bc.hrm_Info, - "Description": bc.cHeartRateMeasurementName, + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": bc.hrm_Info, + "value": bc.hrm_Info, + "Description": bc.cHeartRateMeasurementName } }, bc.sSteeringUUID: { bc.cSteeringUnknown1UUID: { - "Properties": (GATTCharacteristicProperties.write), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": b"\x00\x00", - "value": b"\x00\x00", - "Description": bc.cSteeringUnknown1Name, + "Properties": (GATTCharacteristicProperties.write), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": b'\x00\x00', + "value": b'\x00\x00', + "Description": bc.cSteeringUnknown1Name }, bc.cSteeringUnknown2UUID: { - "Properties": (GATTCharacteristicProperties.read), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": b"\xff", - "value": b"\xff", - "Description": bc.cSteeringUnknown2Name, + "Properties": (GATTCharacteristicProperties.read), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": b'\xff', + "value": b'\xff', + "Description": bc.cSteeringUnknown2Name }, bc.cSteeringUnknown3UUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": b"\x00", - "value": b"\x00", - "Description": bc.cSteeringUnknown3Name, + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": b'\x00', + "value": b'\x00', + "Description": bc.cSteeringUnknown3Name }, bc.cSteeringUnknown4UUID: { - "Properties": (GATTCharacteristicProperties.read), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": b"\xff", - "value": b"\xff", - "Description": bc.cSteeringUnknown4Name, + "Properties": (GATTCharacteristicProperties.read), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": b'\xff', + "value": b'\xff', + "Description": bc.cSteeringUnknown4Name }, bc.cSteeringAngleUUID: { - "Properties": (GATTCharacteristicProperties.notify), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": bc.angle_Info, - "value": bc.angle_Info, - "Description": bc.cSteeringAngleName, + "Properties": (GATTCharacteristicProperties.notify), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": bc.angle_Info, + "value": bc.angle_Info, + "Description": bc.cSteeringAngleName }, bc.cSteeringTxUUID: { - "Properties": (GATTCharacteristicProperties.indicate), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": b"\x00", - "value": b"\x00", - "Description": bc.cSteeringTxName, + "Properties": (GATTCharacteristicProperties.indicate), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": b'\x00', + "value": b'\x00', + "Description": bc.cSteeringTxName }, bc.cSteeringRxUUID: { - "Properties": (GATTCharacteristicProperties.write), - "Permissions": ( - GATTAttributePermissions.readable | GATTAttributePermissions.writeable - ), - "Value": b"\x00", - "value": b"\x00", - "Description": bc.cSteeringRxName, - }, - }, + "Properties": (GATTCharacteristicProperties.write), + "Permissions": (GATTAttributePermissions.readable | GATTAttributePermissions.writeable), + "Value": b'\x00', + "value": b'\x00', + "Description": bc.cSteeringRxName + } + } } - -# ------------------------------------------------------------------------------- -# c l s F T M S _ b l e s s -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- +# c l s F T M S _ b l e s s +#------------------------------------------------------------------------------- # Class to create an FiTnessMachineServer, based upon bless # # User methods: @@ -306,41 +276,41 @@ # User attributes: # See parent class AND # See below -# ------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- class clsFTMS_bless(clsBleServer): - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # CTP data, received through WriteRequest() and sent by client - # --------------------------------------------------------------------------- - TargetMode = None # No target received; and then: - # either mode_Power or mode_Grade - TargetGrade = 0 # % - TargetPower = 100 # Watt - - WindResistance = 0 - WindSpeed = 0 - DraftingFactor = 1 # Default since not supplied - RollingResistance = 0 - - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- + TargetMode = None # No target received; and then: + # either mode_Power or mode_Grade + TargetGrade = 0 # % + TargetPower = 100 # Watt + + WindResistance = 0 + WindSpeed = 0 + DraftingFactor = 1 # Default since not supplied + RollingResistance = 0 + + #--------------------------------------------------------------------------- # data provided by SetAthleteData() as called by application - # --------------------------------------------------------------------------- - HeartRate = 0 + #--------------------------------------------------------------------------- + HeartRate = 0 - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # data provided by SetTrainerData() as called by application - # --------------------------------------------------------------------------- - CurrentSpeed = 0 # km/hour - Cadence = 0 # /minute - CurrentPower = 0 # Watt - SteeringAngle = 0 # Steering is always present, - # regardless the -S command-line setting - # If no steering, value is zero. - - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- + CurrentSpeed = 0 # km/hour + Cadence = 0 # /minute + CurrentPower = 0 # Watt + SteeringAngle = 0 # Steering is always present, + # regardless the -S command-line setting + # If no steering, value is zero. + + #--------------------------------------------------------------------------- # Internal workflow control data - # --------------------------------------------------------------------------- - HasControl = False # CTP is controlling the FTMS - Started = False # A CTP training is started + #--------------------------------------------------------------------------- + HasControl = False # CTP is controlling the FTMS + Started = False # A CTP training is started # -------------------------------------------------------------------------- # _ _ i n i t _ _ @@ -356,9 +326,9 @@ def __init__(self, activate): if UseBluetooth and activate: super().__init__("FortiusAntTrainer", FitnessMachineGatt) self.last_call_time = 0 - self.min_interval = 1 / 500 # 5000 times per second + self.min_interval = 1 / 500 # 500 times per second else: - pass # Data structure is created, no actions + pass # Data structure is created, no actions # -------------------------------------------------------------------------- # Ratelimitedcall @@ -391,108 +361,82 @@ def rate_limited_call(self, func, *args, **kwargs): # Output OK = False # -------------------------------------------------------------------------- def SetAthleteData(self, HeartRate): - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Logging - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- self.logfileWrite("clsFTMS_bless.SetAthleteData(%s)" % HeartRate) - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Remember provided data - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- self.HeartRate = HeartRate - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Update heartrate in FTMS - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- if self.OK: flags = 0 - h = int(self.HeartRate) & 0xFF + h = int(self.HeartRate) & 0xff # Avoid value anomalities info = struct.pack(bc.little_endian + bc.unsigned_char * 2, flags, h) - characteristic = self.BlessServer.get_characteristic( - bc.cHeartRateMeasurementUUID - ) + characteristic = self.BlessServer.get_characteristic(bc.cHeartRateMeasurementUUID) self.rate_limited_call(setattr, characteristic, "value", info) - self.rate_limited_call( - self.BlessServer.update_value, - bc.sHeartRateUUID, - bc.cHeartRateMeasurementUUID, - ) - + self.rate_limited_call(self.BlessServer.update_value, bc.sHeartRateUUID, bc.cHeartRateMeasurementUUID,) else: - self.logfileConsole( - "clsFTMS_bless.SetAthleteData() error, interface not open" - ) + self.logfileConsole("clsFTMS_bless.SetAthleteData() error, interface not open") def SetTrainerData(self, CurrentSpeed, Cadence, CurrentPower): - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Logging - # ----------------------------------------------------------------------- - self.logfileWrite( - "clsFTMS_bless.SetTrainerData(%s, %s, %s)" - % (CurrentSpeed, Cadence, CurrentPower) - ) + #----------------------------------------------------------------------- + self.logfileWrite("clsFTMS_bless.SetTrainerData(%s, %s, %s)" % (CurrentSpeed, Cadence, CurrentPower)) - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Remember provided data - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- self.CurrentSpeed = CurrentSpeed - self.Cadence = Cadence + self.Cadence = Cadence self.CurrentPower = CurrentPower - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Update trainer status in FTMS # Note that: Speed always present UNLESS...) # HeartRate not transmitted, is not used. - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- if self.OK: - flags = ( - bc.ibd_InstantaneousCadencePresent | bc.ibd_InstantaneousPowerPresent - ) - s = int(self.CurrentSpeed * 100) & 0xFFFF - c = int(self.Cadence * 2) & 0xFFFF - p = int(self.CurrentPower) & 0xFFFF - info = struct.pack(bc.little_endian + bc.unsigned_short * 4, flags, s, c, p) + flags = (bc.ibd_InstantaneousCadencePresent | bc.ibd_InstantaneousPowerPresent) + s = int(self.CurrentSpeed * 100) & 0xffff # Avoid value anomalities + c = int(self.Cadence * 2) & 0xffff # Avoid value anomalities + p = int(self.CurrentPower) & 0xffff # Avoid value anomalities + info = struct.pack (bc.little_endian + bc.unsigned_short * 4, flags, s, c, p) characteristic = self.BlessServer.get_characteristic(bc.cIndoorBikeDataUUID) self.rate_limited_call(setattr, characteristic, "value", info) - self.rate_limited_call( - self.BlessServer.update_value, - bc.sFitnessMachineUUID, - bc.cIndoorBikeDataUUID, - ) - + self.rate_limited_call(self.BlessServer.update_value, bc.sFitnessMachineUUID, bc.cIndoorBikeDataUUID,) else: - self.logfileConsole( - "clsFTMS_bless.SetTrainerData() error, interface not open" - ) - + self.logfileConsole("clsFTMS_bless.SetTrainerData() error, interface not open") + def SetSteeringAngle(self, SteeringAngle): - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Logging - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- self.logfileWrite("clsFTMS_bless.SetSteeringAngle(%s)" % SteeringAngle) - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Remember provided data - # ----------------------------------------------------------------------- - self.SteeringAngle = SteeringAngle + #----------------------------------------------------------------------- + self.SteeringAngle = SteeringAngle - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Update angle in steering - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- if self.OK: - a = SteeringAngle - info = struct.pack(bc.little_endian + bc.float, a) + a = SteeringAngle # Avoid value anomalities here (if needed) + info = struct.pack (bc.little_endian + bc.float, a) characteristic = self.BlessServer.get_characteristic(bc.cSteeringAngleUUID) self.rate_limited_call(setattr, characteristic, "value", info) - self.rate_limited_call( - self.BlessServer.update_value, bc.sSteeringUUID, bc.cSteeringAngleUUID - ) - + self.rate_limited_call(self.BlessServer.update_value, bc.sSteeringUUID, bc.cSteeringAngleUUID) else: - self.logfileConsole( - "clsFTMS_bless.SetSteeringAngle() error, interface not open" - ) + self.logfileConsole("clsFTMS_bless.SetSteeringAngle() error, interface not open") # -------------------------------------------------------------------------- # C l i e n t D i s c o n n e c t e d @@ -506,7 +450,7 @@ def SetSteeringAngle(self, SteeringAngle): # -------------------------------------------------------------------------- def ClientDisconnected(self): self.HasControl = False - self.Started = False + self.Started = False # -------------------------------------------------------------------------- # R e f r e s h @@ -524,258 +468,202 @@ def ClientDisconnected(self): def Refresh(self): return self.OK - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # l o g f i l e W r i t e / C o n s o l e / T r a c e b a c k - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # Input: text to be written to logfile # # Function Use our own standard logging functions # # Output: none - # --------------------------------------------------------------------------- - if not BlessExample: # As used in the FortiusAnt context - + #--------------------------------------------------------------------------- + if not BlessExample: # As used in the FortiusAnt context def logfileWrite(self, message): - if debug.on(debug.Ble): - logfile.Write(message) + if debug.on(debug.Ble): logfile.Write(message) def logfileConsole(self, message): logfile.Console(message) def logfileTraceback(self, exception): - if debug.on(debug.Ble): - logfile.Traceback(exception) + if debug.on(debug.Ble): logfile.Traceback(exception) - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # R e a d R e q u e s t - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # Input: characteristic for which the value is requested # # Function Return the requested value, without further processing # Replaces parent class because of enhanced logging # # Output: characteristic.value - # --------------------------------------------------------------------------- - def ReadRequest( - self, characteristic: BlessGATTCharacteristic, **kwargs - ) -> bytearray: - uuid = str(characteristic._uuid) - if uuid == bc.cFitnessMachineFeatureUUID: - char = bc.cFitnessMachineFeatureName - elif uuid == bc.cIndoorBikeDataUUID: - char = bc.cIndoorBikeDataName - elif uuid == bc.cFitnessMachineStatusUUID: - char = bc.cFitnessMachineStatusName - elif uuid == bc.cFitnessMachineControlPointUUID: - char = bc.cFitnessMachineControlPointName - elif uuid == bc.cSupportedPowerRangeUUID: - char = bc.cSupportedPowerRangeName - elif uuid == bc.cHeartRateMeasurementUUID: - char = bc.cHeartRateMeasurementName - elif uuid == bc.cSteeringAngleUUID: - char = bc.cSteeringAngleName - else: - char = "?" - - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- + def ReadRequest(self, + characteristic: BlessGATTCharacteristic, + **kwargs + ) -> bytearray: + + uuid = str(characteristic._uuid) + if uuid == bc.cFitnessMachineFeatureUUID: char = bc.cFitnessMachineFeatureName + elif uuid == bc.cIndoorBikeDataUUID: char = bc.cIndoorBikeDataName + elif uuid == bc.cFitnessMachineStatusUUID: char = bc.cFitnessMachineStatusName + elif uuid == bc.cFitnessMachineControlPointUUID: char = bc.cFitnessMachineControlPointName + elif uuid == bc.cSupportedPowerRangeUUID: char = bc.cSupportedPowerRangeName + elif uuid == bc.cHeartRateMeasurementUUID: char = bc.cHeartRateMeasurementName + elif uuid == bc.cSteeringAngleUUID: char = bc.cSteeringAngleName + else: char = "?" + + #--------------------------------------------------------------------------- # Logging - # --------------------------------------------------------------------------- - self.logfileWrite( - 'clsFTMS_bless.ReadRequest(): characteristic "%s", value = %s' - % (char, HexSpace(characteristic._value)) - ) + #--------------------------------------------------------------------------- + self.logfileWrite('clsFTMS_bless.ReadRequest(): characteristic "%s", value = %s' % + (char, HexSpace(characteristic._value))) return characteristic.value - # ------------------------------------------------------------------------------- + #------------------------------------------------------------------------------- # W r i t e R e q u e s t - # ------------------------------------------------------------------------------- + #------------------------------------------------------------------------------- # Input: characteristic for which the value must be updated # # Function Process the request # # Output: characteristic values modified according request # HasControl, Started - # ------------------------------------------------------------------------------- - def WriteRequest( - self, characteristic: BlessGATTCharacteristic, pvalue: Any, **kwargs - ): - value = bytes(pvalue) # at least for struct.unpack() - - uuid = str(characteristic._uuid) - if uuid == bc.cFitnessMachineFeatureUUID: - char = bc.cFitnessMachineFeatureName - elif uuid == bc.cIndoorBikeDataUUID: - char = bc.cIndoorBikeDataName - elif uuid == bc.cFitnessMachineStatusUUID: - char = bc.cFitnessMachineStatusName - elif uuid == bc.cFitnessMachineControlPointUUID: - char = bc.cFitnessMachineControlPointName - elif uuid == bc.cSupportedPowerRangeUUID: - char = bc.cSupportedPowerRangeName - elif uuid == bc.cHeartRateMeasurementUUID: - char = bc.cHeartRateMeasurementUUID - elif uuid == bc.cSteeringAngleUUID: - char = bc.cSteeringAngleName - else: - char = "?" - - # --------------------------------------------------------------------------- + #------------------------------------------------------------------------------- + def WriteRequest(self, + characteristic: BlessGATTCharacteristic, + pvalue: Any, + **kwargs + ): + + value = bytes(pvalue) # at least for struct.unpack() + + uuid = str(characteristic._uuid) + if uuid == bc.cFitnessMachineFeatureUUID: char = bc.cFitnessMachineFeatureName + elif uuid == bc.cIndoorBikeDataUUID: char = bc.cIndoorBikeDataName + elif uuid == bc.cFitnessMachineStatusUUID: char = bc.cFitnessMachineStatusName + elif uuid == bc.cFitnessMachineControlPointUUID: char = bc.cFitnessMachineControlPointName + elif uuid == bc.cSupportedPowerRangeUUID: char = bc.cSupportedPowerRangeName + elif uuid == bc.cHeartRateMeasurementUUID: char = bc.cHeartRateMeasurementUUID + elif uuid == bc.cSteeringAngleUUID: char = bc.cSteeringAngleName + else: char = "?" + + #--------------------------------------------------------------------------- # Logging - # --------------------------------------------------------------------------- - self.logfileWrite( - 'bleBless: Write request for characteristic "%s", actual value = %s, provided value = %s' - % (char, HexSpace(characteristic.value), HexSpace(value)) - ) + #--------------------------------------------------------------------------- + self.logfileWrite('bleBless: Write request for characteristic "%s", actual value = %s, provided value = %s' % + (char, HexSpace(characteristic.value), HexSpace(value))) - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # MachineControlPoint modifies behaviour; the only write we expect - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- if not uuid == bc.cFitnessMachineControlPointUUID: - self.logfileConsole( - 'bleBless error: Write request on "%s" characteristic is not supported; ignored.' - % char - ) + self.logfileConsole('bleBless error: Write request on "%s" characteristic is not supported; ignored.' % char) else: - OpCode = int(value[0]) # The operation to be performed + OpCode = int(value[0]) # The operation to be performed ResultCode = bc.fmcp_Success # Let's assume it will be OK - UseWorkflow = True # A CTP must request control to be able to - # send further requests and Start before - # changing TargetPower/Grade - # If UseWorkflow == False, these checks - # are disabled. - # - # Funny things is, that HasControl suggests - # a check, but if the CTP proceeds regard- - # less, the Request would be accepted - # Unless there we would know what CTP has - # been granted access... - # - # Now that _FortiusAntServer() detects - # a disconnect, the workflow can be enabled. - # ----------------------------------------------------------------------- + UseWorkflow = True # A CTP must request control to be able to + # send further requests and Start before + # changing TargetPower/Grade + # If UseWorkflow == False, these checks + # are disabled. + # + # Funny things is, that HasControl suggests + # a check, but if the CTP proceeds regard- + # less, the Request would be accepted + # Unless there we would know what CTP has + # been granted access... + # + # Now that _FortiusAntServer() detects + # a disconnect, the workflow can be enabled. + #----------------------------------------------------------------------- # React on requested operation # - check workflow # - accept values and/or modify internal state (HasControl, Started) # - notify client that value is changed (FitnessMachineStatus) # - notify that operation is completed (ResultCode) - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- if OpCode == bc.fmcp_RequestControl: if not UseWorkflow or not self.HasControl: self.HasControl = True - self.Started = False + self.Started = False self.logfileWrite("bleBless: HasControl = True") - self.Message = ", Bluetooth interface controlled" + self.Message = ", Bluetooth interface controlled" else: ResultCode = bc.fmcp_ControlNotPermitted - self.logfileConsole( - "bleBless error: control requested by client, control already granted" - ) + self.logfileConsole("bleBless error: control requested by client, control already granted") elif UseWorkflow and not self.HasControl: ResultCode = bc.fmcp_ControlNotPermitted - self.logfileConsole( - "bleBless error: request received, but client has no control" - ) + self.logfileConsole("bleBless error: request received, but client has no control") else: if OpCode == bc.fmcp_StartOrResume: self.Started = True self.logfileWrite("bleBless: Started = True") - self.notifyStartOrResume() # Confirm receipt to client - self.Message = ", Bluetooth interface training started" + self.notifyStartOrResume() # Confirm receipt to client + self.Message = ", Bluetooth interface training started" elif OpCode == bc.fmcp_SetTargetPower: try: - tuple = struct.unpack( - bc.little_endian + bc.unsigned_char + bc.unsigned_short, - value, - ) + tuple = struct.unpack (bc.little_endian + bc.unsigned_char + bc.unsigned_short, value) except Exception as e: - self.logfileConsole( - "bleBless error: unpack SetTargetPower %e" % e - ) - # opcode = tuple[0] + self.logfileConsole("bleBless error: unpack SetTargetPower %e" % e) + #opcode = tuple[0] self.TargetPower = tuple[1] self.TargetGrade = 0 - self.TargetMode = mode_Power + self.TargetMode = mode_Power self.logfileWrite("bleBless: TargetPower = %s" % self.TargetPower) - self.notifySetTargetPower() # Confirm receipt to client - self.Message = ", Bluetooth interface in power mode" + self.notifySetTargetPower() # Confirm receipt to client + self.Message = ", Bluetooth interface in power mode" elif OpCode == bc.fmcp_SetIndoorBikeSimulation: try: - tuple = struct.unpack( - bc.little_endian - + bc.unsigned_char - + bc.short - + bc.short - + bc.unsigned_char - + bc.unsigned_char, - value, - ) + tuple = struct.unpack (bc.little_endian + bc.unsigned_char + bc.short + bc.short + bc.unsigned_char + bc.unsigned_char, value) except Exception as e: - self.logfileConsole( - "bleBless error: unpack SetIndoorBikeSimulation %s" % e - ) - # opcode = tuple[0] - self.WindSpeed = round(tuple[1] * 0.001, 3) - self.TargetGrade = round(tuple[2] * 0.01, 2) - self.TargetPower = 0 - self.TargetMode = mode_Grade + self.logfileConsole("bleBless error: unpack SetIndoorBikeSimulation %s" % e) + #opcode = tuple[0] + self.WindSpeed = round(tuple[1] * 0.001, 3) + self.TargetGrade = round(tuple[2] * 0.01, 2) + self.TargetPower = 0 + self.TargetMode = mode_Grade self.RollingResistance = round(tuple[3] * 0.0001, 4) - self.WindResistance = round(tuple[4] * 0.01, 2) + self.WindResistance = round(tuple[4] * 0.01, 2) self.logfileWrite( - "bleBless: windspeed=%s, TargetGrade=%s, RollingResistance=%s, WindResistance=%s" - % ( - self.WindSpeed, - self.TargetGrade, - self.RollingResistance, - self.WindResistance, - ) - ) - self.notifySetIndoorBikeSimulation() # Confirm receipt to client - self.Message = ", Bluetooth interface in grade mode" + "bleBless: windspeed=%s, TargetGrade=%s, RollingResistance=%s, WindResistance=%s" % + (self.WindSpeed, self.TargetGrade, self.RollingResistance, self.WindResistance)) + self.notifySetIndoorBikeSimulation() # Confirm receipt to client + self.Message = ", Bluetooth interface in grade mode" elif OpCode == bc.fmcp_StopOrPause: self.Started = False self.logfileWrite("bleBless: Started = False") - self.notifyStopOrPause() # Confirm receipt to client - self.Message = ", Bluetooth interface training stopped" + self.notifyStopOrPause() # Confirm receipt to client + self.Message = ", Bluetooth interface training stopped" elif OpCode == bc.fmcp_Reset: - self.Started = False + self.Started = False self.HasControl = False self.logfileWrite("bleBless: HasControl = False") - self.notifyReset() # Confirm receipt to client - self.Message = ", Bluetooth interface open" + self.notifyReset() # Confirm receipt to client + self.Message = ", Bluetooth interface open" else: self.logfileConsole("bleBless error: Unknown OpCode %s" % OpCode) ResultCode = bc.fmcp_ControlNotPermitted - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Response: - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- ResponseCode = 0x80 - info = struct.pack( - bc.little_endian + bc.unsigned_char * 3, - ResponseCode, - OpCode, - ResultCode, - ) + info = struct.pack(bc.little_endian + bc.unsigned_char * 3, ResponseCode, OpCode, ResultCode) characteristic.value = info - self.BlessServer.update_value( - bc.sFitnessMachineUUID, bc.cFitnessMachineControlPointUUID - ) + self.BlessServer.update_value(bc.sFitnessMachineUUID, bc.cFitnessMachineControlPointUUID) if False: - self.logfileWrite( - "bleBless: New value for characteristic %s = %s" - % (char, HexSpace(info)) - ) + self.logfileWrite("bleBless: New value for characteristic %s = %s" % (char, HexSpace(info))) # -------------------------------------------------------------------------- # After that a characteristic is written, it must also be confirmed through @@ -783,89 +671,45 @@ def WriteRequest( # -------------------------------------------------------------------------- def notifyStartOrResume(self): self.logfileWrite("bleBless.notifyStartOrResume()") - info = struct.pack( - bc.little_endian + bc.unsigned_char, - bc.fms_FitnessMachineStartedOrResumedByUser, - ) - characteristic = self.BlessServer.get_characteristic( - bc.cFitnessMachineStatusUUID - ) + info = struct.pack(bc.little_endian + bc.unsigned_char, bc.fms_FitnessMachineStartedOrResumedByUser) + characteristic = self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID) self.rate_limited_call(setattr, characteristic, "value", info) - self.rate_limited_call( - self.BlessServer.update_value, - bc.sFitnessMachineUUID, - bc.cFitnessMachineStatusUUID, - ) + self.rate_limited_call(self.BlessServer.update_value, bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID,) def notifySetTargetPower(self): self.logfileWrite("bleBless.notifySetTargetPower()") - info = struct.pack( - bc.little_endian + bc.unsigned_char + bc.unsigned_short, - bc.fms_TargetPowerChanged, - self.TargetPower, - ) - characteristic = self.BlessServer.get_characteristic( - bc.cFitnessMachineStatusUUID - ) + info = struct.pack(bc.little_endian + bc.unsigned_char + bc.unsigned_short, bc.fms_TargetPowerChanged, self.TargetPower) + characteristic = self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID) self.rate_limited_call(setattr, characteristic, "value", info) - self.rate_limited_call( - self.BlessServer.update_value, - bc.sFitnessMachineUUID, - bc.cFitnessMachineStatusUUID, - ) + self.rate_limited_call(self.BlessServer.update_value, bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID,) def notifySetIndoorBikeSimulation(self): self.logfileWrite("bleBless.notifySetIndoorBikeSimulation()") - windSpeed = int(self.WindSpeed / 0.001) - grade = int(self.TargetGrade / 0.01) - crr = int(self.RollingResistance / 0.0001) - cw = int(self.WindResistance / 0.01) - info = struct.pack( - bc.little_endian + bc.unsigned_char + bc.short * 2 + bc.unsigned_char * 2, - bc.fms_IndoorBikeSimulationParametersChanged, - windSpeed, - grade, - crr, - cw, - ) - characteristic = self.BlessServer.get_characteristic( - bc.cFitnessMachineStatusUUID - ) + + windSpeed = int(self.WindSpeed / 0.001 ) + grade = int(self.TargetGrade / 0.01 ) + crr = int(self.RollingResistance / 0.0001) + cw = int(self.WindResistance / 0.01 ) + + info = struct.pack(bc.little_endian + bc.unsigned_char + bc.short * 2 + bc.unsigned_char * 2, + bc.fms_IndoorBikeSimulationParametersChanged, windSpeed, grade, crr, cw) + characteristic = self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID) self.rate_limited_call(setattr, characteristic, "value", info) - self.rate_limited_call( - self.BlessServer.update_value, - bc.sFitnessMachineUUID, - bc.cFitnessMachineStatusUUID, - ) + self.rate_limited_call(self.BlessServer.update_value, bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID,) def notifyStopOrPause(self): self.logfileWrite("bleBless.notifyStopOrPause()") - info = struct.pack( - bc.little_endian + bc.unsigned_char, - bc.fms_FitnessMachineStoppedOrPausedByUser, - ) - characteristic = self.BlessServer.get_characteristic( - bc.cFitnessMachineStatusUUID - ) + info = struct.pack(bc.little_endian + bc.unsigned_char, bc.fms_FitnessMachineStoppedOrPausedByUser) + characteristic = self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID) self.rate_limited_call(setattr, characteristic, "value", info) - self.rate_limited_call( - self.BlessServer.update_value, - bc.sFitnessMachineUUID, - bc.cFitnessMachineStatusUUID, - ) + self.rate_limited_call(self.BlessServer.update_value, bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID,) def notifyReset(self): self.logfileWrite("bleBless.notifyReset()") info = struct.pack(bc.little_endian + bc.unsigned_char, bc.fms_Reset) - characteristic = self.BlessServer.get_characteristic( - bc.cFitnessMachineStatusUUID - ) + characteristic = self.BlessServer.get_characteristic(bc.cFitnessMachineStatusUUID) self.rate_limited_call(setattr, characteristic, "value", info) - self.rate_limited_call( - self.BlessServer.update_value, - bc.sFitnessMachineUUID, - bc.cFitnessMachineStatusUUID, - ) + self.rate_limited_call(self.BlessServer.update_value, bc.sFitnessMachineUUID, bc.cFitnessMachineStatusUUID,) # ------------------------------------------------------------------------------ # S i m u l a t o r @@ -879,33 +723,27 @@ def notifyReset(self): # SetAthleteData() and/or SetTrainerData() # Refresh() # use class-attributes (TargetMode, TargetPower, ...) - # + # # # Output The interface is used, no further output. # ------------------------------------------------------------------------------ def Simulator(self): - self.logfileConsole( - "---------------------------------------------------------------------------------------" - ) + self.logfileConsole("---------------------------------------------------------------------------------------") self.logfileConsole("FortiusAnt simulated trainer is active") - self.logfileConsole( - "Start a training in a CTP; 5 seconds after completing the training, simulation will end" - ) - self.logfileConsole( - "---------------------------------------------------------------------------------------" - ) + self.logfileConsole("Start a training in a CTP; 5 seconds after completing the training, simulation will end") + self.logfileConsole("---------------------------------------------------------------------------------------") - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # FortiusAntServer is now active # read/write through read_Request() and write_Request() functions - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- ClientWasConnected = False i = 5 while i: - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Stop 5 seconds after a client is disconnected # In the meantime, Start, Power/Grade, Stop is expected.... - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- if self.ClientConnected: ClientWasConnected = True @@ -913,39 +751,39 @@ def Simulator(self): i -= 1 pass - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Update trainer info every second - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- time.sleep(1) - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Create some fancy data - # ----------------------------------------------------------------------- - s = int(time.time() % 60) # Seconds - HeartRate = int(000 + s) - Cadence = int(100 + s) - CurrentSpeed = round(200 + s, 1) - CurrentPower = int(300 + s) - SteeringAngle = s - 30 # -30 ... 30 - - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- + s = int(time.time() % 60) # Seconds + HeartRate = int(000 + s) + Cadence = int(100 + s ) + CurrentSpeed = round(200 + s,1) + CurrentPower = int(300 + s) + SteeringAngle= s - 30 # -30 ... 30 + + #----------------------------------------------------------------------- # Update actual values of Athlete and Trainer and Steering - # ----------------------------------------------------------------------- - self.SetAthleteData(HeartRate) - self.SetTrainerData(CurrentSpeed, Cadence, CurrentPower) + #----------------------------------------------------------------------- + self.SetAthleteData (HeartRate) + self.SetTrainerData (CurrentSpeed, Cadence, CurrentPower) self.SetSteeringAngle(SteeringAngle) - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Allow class to take care that attributes are accurate - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- self.Refresh() - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # Here the real application can take action, usually adjust the # resistance of the bicycle and/or display the target Mode/Power/Grade. - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- # All we do is show current status / target # --> Four variables are filled by ourselves, which in reality comes from # the athlete's HeartRateMonitor and the fitness bike (see above). @@ -954,52 +792,37 @@ def Simulator(self): # # --> TargetMode, TargetPower, TargetGrade are provided by the Cycling # Trining Program (CTP), the client to this fitness machine. - # ----------------------------------------------------------------------- + #----------------------------------------------------------------------- print( - "Client=%-5s HasControl=%-5s Started=%-5s TargetPower=%4s TargetGrade=%-6s Speed=%3s Cadence=%3s, Power=%3s, HeartRate=%3s Angle=%5s" - % ( - self.ClientConnected, - self.HasControl, - self.Started, - self.TargetPower, - self.TargetGrade, - self.CurrentSpeed, - self.Cadence, - self.CurrentPower, - self.HeartRate, - self.SteeringAngle, - ) - ) + "Client=%-5s HasControl=%-5s Started=%-5s TargetPower=%4s TargetGrade=%-6s Speed=%3s Cadence=%3s, Power=%3s, HeartRate=%3s Angle=%5s" % + (self.ClientConnected, self.HasControl, self.Started, self.TargetPower, self.TargetGrade, self.CurrentSpeed, self.Cadence, self.CurrentPower, self.HeartRate, self.SteeringAngle)) self.logfileConsole("FortiusAnt simulated trainer is stopped") self.logfileConsole("---------------------------------------") - # ============================================================================== # Main program # ============================================================================== if __name__ == "__main__": - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # Initialization - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- if not BlessExample: debug.activate(debug.Ble | debug.logging_DEBUG) logfile.Open() - print("FTMS server in FortiusAnt context") + print('FTMS server in FortiusAnt context') else: - logging.basicConfig( - level=logging.DEBUG - ) # pylint:disable=invalid-name,used-before-assignment,undefined-variable + logging.basicConfig(level=logging.DEBUG) # pylint:disable=invalid-name,used-before-assignment,undefined-variable logger = logging.getLogger(name=__name__) - print("FTMS server in bless/example context") + print('FTMS server in bless/example context') print("bleBless started") print("----------------") - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # This is what it's all about - # --------------------------------------------------------------------------- - FortiusAntServer = clsFTMS_bless(True) # clv.ble + #--------------------------------------------------------------------------- + FortiusAntServer = clsFTMS_bless(True) # clv.ble print("Message=%s" % FortiusAntServer.Message) b = FortiusAntServer.Open() @@ -1010,9 +833,9 @@ def Simulator(self): FortiusAntServer.Close() print("Message=%s" % FortiusAntServer.Message) - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # Termination - # --------------------------------------------------------------------------- + #--------------------------------------------------------------------------- print("bleBless ended") """ @@ -1031,7 +854,7 @@ def Simulator(self): Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=201 Cadence=101, Power=301, HeartRate= 1 Angle= -29 Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=202 Cadence=102, Power=302, HeartRate= 2 Angle= -28 Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=203 Cadence=103, Power=303, HeartRate= 3 Angle= -27 - +... Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=258 Cadence=158, Power=358, HeartRate= 58 Angle= 28 Client=False HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=259 Cadence=159, Power=359, HeartRate= 59 Angle= 29 Client=True HasControl=False Started=False TargetPower= 100 TargetGrade=0 Speed=200 Cadence=100, Power=300, HeartRate= 0 Angle= -30 @@ -1123,7 +946,7 @@ def Simulator(self): Characteristic 347b0030-7635-408b-8918-8ff3949ce592 changed | value: 00 00 E8 41 Characteristic 347b0030-7635-408b-8918-8ff3949ce592 changed | value: 00 00 F0 C1 Characteristic 347b0030-7635-408b-8918-8ff3949ce592 changed | value: 00 00 E8 C1 - +... Disconnecting from 4F:69:66:13:7A:2A (LT-ENTERTAIN) Disconnected from 4F:69:66:13:7A:2A (LT-ENTERTAIN) @@ -1176,4 +999,4 @@ def Simulator(self): |--347b0032-7635-408b-8918-8ff3949ce592: Indicate == cSteeringTxUUID |------00002902-0000-1000-8000-00805f9b34fb: -""" +""" \ No newline at end of file