diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 0966a6a..4867486 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -14,7 +14,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ["3.10", "3.11", "3.12", "3.13", "3.14", "pypy-3.10"] + python-version: ["3.10", "3.11", "3.12", "3.13", "pypy-3.10"] os: [ubuntu-latest] steps: - uses: actions/checkout@v4 @@ -25,9 +25,13 @@ jobs: python-version: ${{ matrix.python-version }} allow-prereleases: true + # https://github.com/astral-sh/setup-uv - name: Install UV - uses: astral-sh/setup-uv@v5 + uses: astral-sh/setup-uv@v6 with: + activate-environment: true + enable-cache: true + ignore-nothing-to-cache: true python-version: ${{ matrix.python-version }} - name: Install PyMBus diff --git a/CHANGELOG.md b/CHANGELOG.md index 73e687b..a16dbef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,38 @@ # CHANGELOG +## v0.4.0 + +- remake VIF codes ([PR-17](https://github.com/stankudrow/pymbus/pull/17) + [PR-18](https://github.com/stankudrow/pymbus/pull/18)): + - only one VIFCode (data)class for coding + - the `get_vif_code` factory method is improved + +- subclass TelegramContainer from the `Sequence` ABC ([PR-16](https://github.com/stankudrow/pymbus/pull/16)) + +- subclass the TelegramField class from Python `int` type ([PR-15](https://github.com/stankudrow/pymbus/pull/15)): + - make TelegramField support the operations that `int` does; + - ensure the byte range validation for TelegramField -> the `validate` flag removed; + - unburden and simplify TelegramField successors and classes derived from TelegramContainer; + - revises the [PR-11](https://github.com/stankudrow/pymbus/pull/11). + +- submodule specific Telegram fields (A, C, CI, Data, Value) ([PR-12](https://github.com/stankudrow/pymbus/pull/12)) + +- improve Telegram classes ([PR-11](https://github.com/stankudrow/pymbus/pull/11)): + - the API of TelegramField is enriched: + - `total_ordering` is enabled - the full set of arithmetic comparison operators are on + - bitwise operator support: + - and (&) + - or (|) + - xor (^) + - inversion (~) + - the `validate` flag is added + - the `byte` attribute is removed + - the API of TelegramContainer is enriched: + - `total_ordering` is enabled + - the `validate` flag is added + - all entities related to the the TelegramField and TelegramContainer classes are updated + - introduce the `utils` module with the `validate_byte` function + - some obsolete entities are removed + ## v0.3.0 [PR-8](https://github.com/stankudrow/pymbus/pull/8) @@ -12,7 +45,7 @@ Added: Changed: -- update project metadata; +- update the project metadata; - lint the code and test base(s). ## v0.2.0 @@ -27,7 +60,7 @@ Added: Changed: -- API: reconsidered and flatten; +- API: reconsidered and flattened; - the former `DataRecord` class is now `DataRecordHeader`. ## v0.1.0 diff --git a/pyproject.toml b/pyproject.toml index 136495e..db1b80b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,8 +15,6 @@ classifiers = [ "Development Status :: 3 - Alpha", "Intended Audience :: Developers", "Intended Audience :: Information Technology", - "License :: OSI Approved :: MIT License", - "License :: Public Domain", "Natural Language :: English", "Operating System :: OS Independent", "Programming Language :: Python :: 3 :: Only", @@ -24,8 +22,8 @@ classifiers = [ "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", - "Programming Language :: Python :: 3.14", - "Programming Language :: Python :: Implementation :: CPython" + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy" ] keywords = ["meter-bus", "m-bus", "meters"] dependencies = [] @@ -40,6 +38,12 @@ dev = [ "ruff>=0.9.7", ] +[[tool.uv.index]] +name = "testpypi" +url = "https://test.pypi.org/simple/" +publish-url = "https://test.pypi.org/legacy/" +explicit = true + [tool.coverage.run] branch = true omit = ["tests"] @@ -119,7 +123,10 @@ select = [ "B", # flake8-bugbear "BLE", # flake8-blind-except "C4", # flake8-comprehensions + "C90", # mccabe + # "D", # pydocstyle "DTZ", # flake8-datetimez + "E", # pycodestyle (Error) "EM", # flake8-errmsg "ERA", # eradicate "F", # Pyflakes @@ -127,6 +134,7 @@ select = [ "I", # isort "ICN", # flake8-import-conventions "ISC", # flake8-implicit-str-concat + "N", # pep8-naming (N) "PIE", # flake8-pie "PT", # pytest-style "PYI", # flake8-pyi @@ -136,23 +144,33 @@ select = [ "S", # flake8-bandit "SLF", # flake8-self "SIM", # flake8-simplify + "SLOT", # flake8-slots "T", # flake8-print-linter "T10", # flake8-debugger "T20", # flake8-print "TC", # flake8-type-checking "TID", # flake8-tidy-imports + "TRY", # tryceratops "UP", # pyupgrade + "W", # pycodestyle (Warning) + "YTT", # flake8-2020 ] ignore = [ "ANN401", # https://docs.astral.sh/ruff/rules/any-type/ + "D105", # https://docs.astral.sh/ruff/rules/undocumented-magic-method/ "ISC001", # https://docs.astral.sh/ruff/rules/single-line-implicit-string-concatenation/ ] fixable = ["I", "ICN", "ISC"] [tool.ruff.lint.per-file-ignores] -"__init__.py" = ["F401"] +"{src,tests}/*.py" = [ + "N817", # https://docs.astral.sh/ruff/rules/camelcase-imported-as-acronym/ +] +"src/pymbus/codes/vif.py" = ["ERA001"] "tests/*.py" = [ "ANN201", # https://docs.astral.sh/ruff/rules/missing-return-type-undocumented-public-function/ + "D", + "ERA", "FBT001", # https://docs.astral.sh/ruff/rules/boolean-type-hint-positional-argument/ "PT011", # https://docs.astral.sh/ruff/rules/pytest-raises-too-broad/ "S", diff --git a/src/pymbus/codes/value_info.py b/src/pymbus/codes/value_info.py deleted file mode 100644 index eb79505..0000000 --- a/src/pymbus/codes/value_info.py +++ /dev/null @@ -1,195 +0,0 @@ -from typing import Any - -from pymbus.exceptions import MBusError -from pymbus.telegrams.fields import ValueInformationField as VIF - - -class ValueInformationFieldCode: - CMASK: int - EMASK: None | int = None - DESC: None | str = None - UNIT: Any = None - - def __init__(self, vif: VIF) -> None: - self.validate_vif(vif) - self._vif = vif - self._range = None - - @property - def multiplier(self) -> None | int | float: - return self._range - - def validate_vif(self, vif: VIF) -> None: - byte = vif.byte - cmask = self.CMASK - if emask := self.EMASK: - code = byte & (~emask) - - if (code & 0x7F) != cmask: - msg = f"the {byte} does not fit to the code {cmask}" - raise MBusError(msg) - - -class EnergyWattHourVIFCode(ValueInformationFieldCode): - CMASK = 0b0000_0000 - EMASK = 0b0000_0111 - DESC = "energy" - UNIT = "Wh" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10 ** (pwr - 3) - - -class EnergyJouleVIFCode(ValueInformationFieldCode): - CMASK = 0b0000_1000 - EMASK = 0b0000_0111 - DESC = "energy" - UNIT = "J" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10**pwr - - -class VolumeMeterCubeVIFCode(ValueInformationFieldCode): - CMASK = 0b0001_0000 - EMASK = 0b0000_0111 - DESC = "volume" - UNIT = "m^3" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10 ** (pwr - 6) - - -class MassKilogramVIFCode(ValueInformationFieldCode): - CMASK = 0b0001_1000 - EMASK = 0b0000_0111 - DESC = "mass" - UNIT = "kg" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10 ** (pwr - 3) - - -class OnTimeVIFCode(ValueInformationFieldCode): - CMASK = 0b0010_0000 - EMASK = 0b0000_0011 - DESC = "on time" - UNIT = None - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - unit = self._vif.byte & self.EMASK - if unit == 3: - self.UNIT = "day" - if unit == 2: - self.UNIT = "hour" - if unit == 1: - self.UNIT = "minute" - if unit == 0: - self.UNIT = "second" - - -class OperatingTimeVIFCode(OnTimeVIFCode): - CMASK = 0b0010_0100 - - -class PowerWattVIFCode(ValueInformationFieldCode): - CMASK = 0b0010_1000 - EMASK = 0b0000_0111 - DESC = "power" - UNIT = "W" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10 ** (pwr - 3) - - -class PowerJoulePerHourVIFCode(ValueInformationFieldCode): - CMASK = 0b0011_0000 - EMASK = 0b0000_0111 - DESC = "power" - UNIT = "J/h" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10 ** (pwr) - - -class VolumeFlowCubicMeterPerHourVIFCode(ValueInformationFieldCode): - CMASK = 0b0011_1000 - EMASK = 0b0000_0111 - DESC = "volume flow" - UNIT = "m^3/h" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10 ** (pwr - 6) - - -class VolumeFlowCubicMeterPerMinuteVIFCode(ValueInformationFieldCode): - CMASK = 0b0100_0000 - EMASK = 0b0000_0111 - DESC = "volume flow" - UNIT = "m^3/min" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10 ** (pwr - 7) - - -class VolumeFlowCubicMeterPerSecondVIFCode(ValueInformationFieldCode): - CMASK = 0b0100_1000 - EMASK = 0b0000_0111 - DESC = "volume flow" - UNIT = "m^3/s" - - def __init__(self, vif: VIF) -> None: - super().__init__(vif) - - pwr = self._vif.byte & self.EMASK - self._range = 10 ** (pwr - 9) - - -VIF_CODE_TYPES: set[type[ValueInformationFieldCode]] = { - EnergyWattHourVIFCode, - EnergyJouleVIFCode, - VolumeMeterCubeVIFCode, - MassKilogramVIFCode, - OnTimeVIFCode, - OperatingTimeVIFCode, - PowerWattVIFCode, - PowerJoulePerHourVIFCode, - VolumeFlowCubicMeterPerHourVIFCode, - VolumeFlowCubicMeterPerMinuteVIFCode, - VolumeFlowCubicMeterPerSecondVIFCode, -} - - -def get_vif_code(vif: VIF) -> None | ValueInformationFieldCode: - for code_type in VIF_CODE_TYPES: - try: - return code_type(vif) - except MBusError: - pass - return None diff --git a/src/pymbus/codes/vif.py b/src/pymbus/codes/vif.py new file mode 100644 index 0000000..b858546 --- /dev/null +++ b/src/pymbus/codes/vif.py @@ -0,0 +1,844 @@ +"""M-Bus Value Information Field Code module.""" + +from dataclasses import dataclass +from enum import Enum + +from pymbus.telegrams.fields import ValueInformationField as VIF + + +class VIFCodeDescription(str, Enum): + """VIF code description(s).""" + + no_description = "" + energy = "energy" + volume = "volume" + mass = "mass" + on_time = "on time" + operating_time = "operating time" + power = "power" + volume_flow = "volume flow" + mass_flow = "mass flow" + flow_temp = "flow temperature" + return_temp = "return temperature" + temp_difference = "temperature_difference" + external_temp = "external_temperature" + pressure = "pressure" + time_point = "time point" + hca = "heat cost allocator" + reserved = "reserved" + averaging_duration = "averaging duration" + actuality_duration = "actuality duration" + fabrication_no = "fabrication no" + enhanced = "enhanced" + bus_address = "bus address" + user = "user definable" + any = "any" + manufacturer = "manufacturer specific" + extension = "extension" + + +class VIFCodeUnit(str, Enum): + """VIF code unit(s).""" + + unknown = "unknown" + watt_hour = "Wh" + joule = "J" + meter_cubic = "m^3" + kilogram = "kg" + second = "s" + watt = "W" + joule_per_hour = "J/h" + meter_cubic_per_hour = "m^3/h" + meter_cubic_per_minute = "m^3/min" + meter_cubic_per_second = "m^3/s" + kilogram_per_hour = "kg/h" + celsius = "C" + kelvin = "K" + bar = "bar" + hca = "H.C.A. Units" + date = "date" # for a time point + datetime = "datetime" # for a time point + + +@dataclass +class VIFCode: + """Value Information Code.""" + + code: int + coef: int | float = 1 + desc: str | VIFCodeDescription = VIFCodeDescription.no_description + unit: str | VIFCodeUnit = VIFCodeUnit.unknown + + +_VIF_CODE_MAP: dict[int, VIFCode] = { + # E000_0nnn - Energy (Watt * hour = Wh) + 0b0000_0000: VIFCode( + code=0x00, + coef=1e-3, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.watt_hour, + ), + 0b0000_0001: VIFCode( + code=0x01, + coef=1e-2, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.watt_hour, + ), + 0b0000_0010: VIFCode( + code=0x02, + coef=1e-1, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.watt_hour, + ), + 0b0000_0011: VIFCode( + code=0x03, + coef=1e0, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.watt_hour, + ), + 0b0000_0100: VIFCode( + code=0x04, + coef=1e1, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.watt_hour, + ), + 0b0000_0101: VIFCode( + code=0x05, + coef=1e2, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.watt_hour, + ), + 0b0000_0110: VIFCode( + code=0x06, + coef=1e3, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.watt_hour, + ), + 0b0000_0111: VIFCode( + code=0x07, + coef=1e4, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.watt_hour, + ), + # E000_1nnn - Energy (Joule = J) + 0b0000_1000: VIFCode( + code=0x08, + coef=1e0, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.joule, + ), + 0b0000_1001: VIFCode( + code=0x09, + coef=1e1, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.joule, + ), + 0b0000_1010: VIFCode( + code=0x0A, + coef=1e2, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.joule, + ), + 0b0000_1011: VIFCode( + code=0x0B, + coef=1e3, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.joule, + ), + 0b0000_1100: VIFCode( + code=0x0C, + coef=1e4, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.joule, + ), + 0b0000_1101: VIFCode( + code=0x0D, + coef=1e5, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.joule, + ), + 0b0000_1110: VIFCode( + code=0x0E, + coef=1e6, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.joule, + ), + 0b0000_1111: VIFCode( + code=0x0F, + coef=1e7, + desc=VIFCodeDescription.energy, + unit=VIFCodeUnit.joule, + ), + # E001_0nnn - Volume (Meter cubic = m^3) + 0b0001_0000: VIFCode( + code=0x10, + coef=1e-6, + desc=VIFCodeDescription.volume, + unit=VIFCodeUnit.meter_cubic, + ), + 0b0001_0001: VIFCode( + code=0x11, + coef=1e-5, + desc=VIFCodeDescription.volume, + unit=VIFCodeUnit.meter_cubic, + ), + 0b0001_0010: VIFCode( + code=0x12, + coef=1e-4, + desc=VIFCodeDescription.volume, + unit=VIFCodeUnit.meter_cubic, + ), + 0b0001_0011: VIFCode( + code=0x13, + coef=1e-3, + desc=VIFCodeDescription.volume, + unit=VIFCodeUnit.meter_cubic, + ), + 0b0001_0100: VIFCode( + code=0x14, + coef=1e-2, + desc=VIFCodeDescription.volume, + unit=VIFCodeUnit.meter_cubic, + ), + 0b0001_0101: VIFCode( + code=0x15, + coef=1e-1, + desc=VIFCodeDescription.volume, + unit=VIFCodeUnit.meter_cubic, + ), + 0b0001_0110: VIFCode( + code=0x16, + coef=1e0, + desc=VIFCodeDescription.volume, + unit=VIFCodeUnit.meter_cubic, + ), + 0b0001_0111: VIFCode( + code=0x17, + coef=1e1, + desc=VIFCodeDescription.volume, + unit=VIFCodeUnit.meter_cubic, + ), + # E001_1nnn - Mass (Kilogram = kg) + 0b0001_1000: VIFCode( + code=0x18, + coef=1e-3, + desc=VIFCodeDescription.mass, + unit=VIFCodeUnit.kilogram, + ), + 0b0001_1001: VIFCode( + code=0x19, + coef=1e-2, + desc=VIFCodeDescription.mass, + unit=VIFCodeUnit.kilogram, + ), + 0b0001_1010: VIFCode( + code=0x1A, + coef=1e-1, + desc=VIFCodeDescription.mass, + unit=VIFCodeUnit.kilogram, + ), + 0b0001_1011: VIFCode( + code=0x1B, + coef=1e0, + desc=VIFCodeDescription.mass, + unit=VIFCodeUnit.kilogram, + ), + 0b0001_1100: VIFCode( + code=0x1C, + coef=1e1, + desc=VIFCodeDescription.mass, + unit=VIFCodeUnit.kilogram, + ), + 0b0001_1101: VIFCode( + code=0x1D, + coef=1e2, + desc=VIFCodeDescription.mass, + unit=VIFCodeUnit.kilogram, + ), + 0b0001_1110: VIFCode( + code=0x1E, + coef=1e3, + desc=VIFCodeDescription.mass, + unit=VIFCodeUnit.kilogram, + ), + 0b0001_1111: VIFCode( + code=0x1F, + coef=1e4, + desc=VIFCodeDescription.mass, + unit=VIFCodeUnit.kilogram, + ), + # E010_00nn - On time (in seconds) + 0b0010_0000: VIFCode( + code=0x20, + coef=1, + desc=VIFCodeDescription.on_time, + unit=VIFCodeUnit.second, + ), + 0b0010_0001: VIFCode( + code=0x21, + coef=60, + desc=VIFCodeDescription.on_time, + unit=VIFCodeUnit.second, + ), + 0b0010_0010: VIFCode( + code=0x22, + coef=3600, + desc=VIFCodeDescription.on_time, + unit=VIFCodeUnit.second, + ), + 0b0010_0011: VIFCode( + code=0x23, + coef=86400, + desc=VIFCodeDescription.on_time, + unit=VIFCodeUnit.second, + ), + # E010_01nn - Operating Time (like On Time) + 0b0010_0100: VIFCode( + code=0x24, + coef=1, + desc=VIFCodeDescription.operating_time, + unit=VIFCodeUnit.second, + ), + 0b0010_0101: VIFCode( + code=0x25, + coef=60, + desc=VIFCodeDescription.operating_time, + unit=VIFCodeUnit.second, + ), + 0b0010_0110: VIFCode( + code=0x26, + coef=3600, + desc=VIFCodeDescription.operating_time, + unit=VIFCodeUnit.second, + ), + 0b0010_0111: VIFCode( + code=0x27, + coef=86400, + desc=VIFCodeDescription.operating_time, + unit=VIFCodeUnit.second, + ), + # E010_1nnn - Power (Watt = W) + 0b0010_1000: VIFCode( + code=0x28, + coef=1e-3, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.watt, + ), + 0b0010_1001: VIFCode( + code=0x29, + coef=1e-2, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.watt, + ), + 0b0010_1010: VIFCode( + code=0x2A, + coef=1e-1, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.watt, + ), + 0b0010_1011: VIFCode( + code=0x2B, + coef=1e0, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.watt, + ), + 0b0010_1100: VIFCode( + code=0x2C, + coef=1e1, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.watt, + ), + 0b0010_1101: VIFCode( + code=0x2D, + coef=1e2, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.watt, + ), + 0b0010_1110: VIFCode( + code=0x2E, + coef=1e3, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.watt, + ), + 0b0010_1111: VIFCode( + code=0x2F, + coef=1e4, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.watt, + ), + # E011_0nnn - Power (Joule per hour = J/h) + 0b0011_0000: VIFCode( + code=0x30, + coef=1e0, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.joule_per_hour, + ), + 0b0011_0001: VIFCode( + code=0x31, + coef=1e1, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.joule_per_hour, + ), + 0b0011_0010: VIFCode( + code=0x32, + coef=1e2, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.joule_per_hour, + ), + 0b0011_0011: VIFCode( + code=0x33, + coef=1e3, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.joule_per_hour, + ), + 0b0011_0100: VIFCode( + code=0x34, + coef=1e4, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.joule_per_hour, + ), + 0b0011_0101: VIFCode( + code=0x35, + coef=1e5, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.joule_per_hour, + ), + 0b0011_0110: VIFCode( + code=0x36, + coef=1e6, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.joule_per_hour, + ), + 0b0011_0111: VIFCode( + code=0x37, + coef=1e7, + desc=VIFCodeDescription.power, + unit=VIFCodeUnit.joule_per_hour, + ), + # E011_1nnn - Volume flow (Meter cubic per hour = m^3/h) + 0b0011_1000: VIFCode( + code=0x38, + coef=1e-6, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_hour, + ), + 0b0011_1001: VIFCode( + code=0x39, + coef=1e-5, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_hour, + ), + 0b0011_1010: VIFCode( + code=0x3A, + coef=1e-4, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_hour, + ), + 0b0011_1011: VIFCode( + code=0x3B, + coef=1e-3, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_hour, + ), + 0b0011_1100: VIFCode( + code=0x3C, + coef=1e-2, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_hour, + ), + 0b0011_1101: VIFCode( + code=0x3D, + coef=1e-1, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_hour, + ), + 0b0011_1110: VIFCode( + code=0x3E, + coef=1e0, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_hour, + ), + 0b0011_1111: VIFCode( + code=0x3F, + coef=1e1, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_hour, + ), + # E100_0nnn - Volume flow (Meter cubic per minute = m^3/min) + 0b0100_0000: VIFCode( + code=0x40, + coef=1e-7, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_minute, + ), + 0b0100_0001: VIFCode( + code=0x41, + coef=1e-6, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_minute, + ), + 0b0100_0010: VIFCode( + code=0x42, + coef=1e-5, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_minute, + ), + 0b0100_0011: VIFCode( + code=0x43, + coef=1e-4, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_minute, + ), + 0b0100_0100: VIFCode( + code=0x44, + coef=1e-3, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_minute, + ), + 0b0100_0101: VIFCode( + code=0x45, + coef=1e-2, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_minute, + ), + 0b0100_0110: VIFCode( + code=0x46, + coef=1e-1, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_minute, + ), + 0b0100_0111: VIFCode( + code=0x47, + coef=1e0, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_minute, + ), + # E100_1nnn - Volume flow (Meter cubic per second = m^3/s) + 0b0100_1000: VIFCode( + code=0x48, + coef=1e-9, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_second, + ), + 0b0100_1001: VIFCode( + code=0x49, + coef=1e-8, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_second, + ), + 0b0100_1010: VIFCode( + code=0x4A, + coef=1e-7, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_second, + ), + 0b0100_1011: VIFCode( + code=0x4B, + coef=1e-6, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_second, + ), + 0b0100_1100: VIFCode( + code=0x4C, + coef=1e-5, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_second, + ), + 0b0100_1101: VIFCode( + code=0x4D, + coef=1e-4, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_second, + ), + 0b0100_1110: VIFCode( + code=0x4E, + coef=1e-3, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_second, + ), + 0b0100_1111: VIFCode( + code=0x4F, + coef=1e-2, + desc=VIFCodeDescription.volume_flow, + unit=VIFCodeUnit.meter_cubic_per_second, + ), + # E101_0nnn - Mass flow (Kilogram per hour = kg/h) + 0b0101_0000: VIFCode( + code=0x50, + coef=1e-3, + desc=VIFCodeDescription.mass_flow, + unit=VIFCodeUnit.kilogram_per_hour, + ), + 0b0101_0001: VIFCode( + code=0x51, + coef=1e-2, + desc=VIFCodeDescription.mass_flow, + unit=VIFCodeUnit.kilogram_per_hour, + ), + 0b0101_0010: VIFCode( + code=0x52, + coef=1e-1, + desc=VIFCodeDescription.mass_flow, + unit=VIFCodeUnit.kilogram_per_hour, + ), + 0b0101_0011: VIFCode( + code=0x53, + coef=1e0, + desc=VIFCodeDescription.mass_flow, + unit=VIFCodeUnit.kilogram_per_hour, + ), + 0b0101_0100: VIFCode( + code=0x54, + coef=1e1, + desc=VIFCodeDescription.mass_flow, + unit=VIFCodeUnit.kilogram_per_hour, + ), + 0b0101_0101: VIFCode( + code=0x55, + coef=1e2, + desc=VIFCodeDescription.mass_flow, + unit=VIFCodeUnit.kilogram_per_hour, + ), + 0b0101_0110: VIFCode( + code=0x56, + coef=1e3, + desc=VIFCodeDescription.mass_flow, + unit=VIFCodeUnit.kilogram_per_hour, + ), + 0b0101_0111: VIFCode( + code=0x57, + coef=1e4, + desc=VIFCodeDescription.mass_flow, + unit=VIFCodeUnit.kilogram_per_hour, + ), + # E101_10nn - Flow temperature (Celsius = C) + 0b0101_1000: VIFCode( + code=0x58, + coef=1e-3, + desc=VIFCodeDescription.flow_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0101_1001: VIFCode( + code=0x59, + coef=1e-2, + desc=VIFCodeDescription.flow_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0101_1010: VIFCode( + code=0x5A, + coef=1e-1, + desc=VIFCodeDescription.flow_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0101_1011: VIFCode( + code=0x5B, + coef=1e0, + desc=VIFCodeDescription.flow_temp, + unit=VIFCodeUnit.celsius, + ), + # # E101_11nn - Return temperature (Celsius = C) + 0b0101_1100: VIFCode( + code=0x5C, + coef=1e-3, + desc=VIFCodeDescription.return_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0101_1101: VIFCode( + code=0x5D, + coef=1e-2, + desc=VIFCodeDescription.return_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0101_1110: VIFCode( + code=0x5E, + coef=1e-1, + desc=VIFCodeDescription.return_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0101_1111: VIFCode( + code=0x5F, + coef=1e0, + desc=VIFCodeDescription.return_temp, + unit=VIFCodeUnit.celsius, + ), + # E110_00nn - Temperature difference (Kelvin = K) + 0b0110_0000: VIFCode( + code=0x60, + coef=1e-3, + desc=VIFCodeDescription.temp_difference, + unit=VIFCodeUnit.kelvin, + ), + 0b0110_0001: VIFCode( + code=0x61, + coef=1e-2, + desc=VIFCodeDescription.temp_difference, + unit=VIFCodeUnit.kelvin, + ), + 0b0110_0010: VIFCode( + code=0x62, + coef=1e-1, + desc=VIFCodeDescription.temp_difference, + unit=VIFCodeUnit.kelvin, + ), + 0b0110_0011: VIFCode( + code=0x63, + coef=1e0, + desc=VIFCodeDescription.temp_difference, + unit=VIFCodeUnit.kelvin, + ), + # E110_01nn - External temperature (Celsius = C) + 0b0110_0100: VIFCode( + code=0x64, + coef=1e-3, + desc=VIFCodeDescription.external_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0110_0101: VIFCode( + code=0x65, + coef=1e-2, + desc=VIFCodeDescription.external_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0110_0110: VIFCode( + code=0x66, + coef=1e-1, + desc=VIFCodeDescription.external_temp, + unit=VIFCodeUnit.celsius, + ), + 0b0110_0111: VIFCode( + code=0x67, + coef=1e0, + desc=VIFCodeDescription.external_temp, + unit=VIFCodeUnit.celsius, + ), + # E110_10nn - Pressure (bar) + 0b0110_1000: VIFCode( + code=0x68, + coef=1e-3, + desc=VIFCodeDescription.pressure, + unit=VIFCodeUnit.bar, + ), + 0b0110_1001: VIFCode( + code=0x69, + coef=1e-2, + desc=VIFCodeDescription.pressure, + unit=VIFCodeUnit.bar, + ), + 0b0110_1010: VIFCode( + code=0x6A, + coef=1e-1, + desc=VIFCodeDescription.pressure, + unit=VIFCodeUnit.bar, + ), + 0b0110_1011: VIFCode( + code=0x6B, + coef=1e0, + desc=VIFCodeDescription.pressure, + unit=VIFCodeUnit.bar, + ), + # E110_110n - Time point (date or datetime) + 0b0110_1100: VIFCode( + code=0x6C, + coef=1, + desc=VIFCodeDescription.time_point, + unit=VIFCodeUnit.date, + ), + 0b0110_1101: VIFCode( + code=0x6D, + coef=1, + desc=VIFCodeDescription.time_point, + unit=VIFCodeUnit.datetime, + ), + # E110_1110 = Heat Cost Allocator (H.C.A.) Units + 0b0110_1110: VIFCode( + code=0x6E, coef=1e0, desc=VIFCodeDescription.hca, unit=VIFCodeUnit.hca + ), + # Reserved + 0b0110_1111: VIFCode(code=0x6F, desc=VIFCodeDescription.reserved), + # E111_00nn - Averaging duration (in seconds) + 0b0111_0000: VIFCode( + code=0x70, + coef=1, + desc=VIFCodeDescription.averaging_duration, + unit=VIFCodeUnit.second, + ), + 0b0111_0001: VIFCode( + code=0x71, + coef=60, + desc=VIFCodeDescription.averaging_duration, + unit=VIFCodeUnit.second, + ), + 0b0111_0010: VIFCode( + code=0x72, + coef=3600, + desc=VIFCodeDescription.averaging_duration, + unit=VIFCodeUnit.second, + ), + 0b0111_0011: VIFCode( + code=0x73, + coef=86400, + desc=VIFCodeDescription.averaging_duration, + unit=VIFCodeUnit.second, + ), + # E111_01nn - Actuality duration (in seconds) + 0b0111_0100: VIFCode( + code=0x74, + coef=1, + desc=VIFCodeDescription.actuality_duration, + unit=VIFCodeUnit.second, + ), + 0b0111_0101: VIFCode( + code=0x75, + coef=60, + desc=VIFCodeDescription.actuality_duration, + unit=VIFCodeUnit.second, + ), + 0b0111_0110: VIFCode( + code=0x76, + coef=3600, + desc=VIFCodeDescription.actuality_duration, + unit=VIFCodeUnit.second, + ), + 0b0111_0111: VIFCode( + code=0x77, + coef=86400, + desc=VIFCodeDescription.actuality_duration, + unit=VIFCodeUnit.second, + ), + # E111_1000 + 0b0111_1000: VIFCode(code=0x78, desc=VIFCodeDescription.fabrication_no), + # E111_1001 + 0b0111_1001: VIFCode(code=0x79, desc=VIFCodeDescription.enhanced), + # E111_1010 + 0b0111_1010: VIFCode(code=0x7A, desc=VIFCodeDescription.bus_address), + # special purpose VIF codes + 0b0111_1100: VIFCode(code=0x7C, desc=VIFCodeDescription.user), + 0b0111_1110: VIFCode(code=0x7E, desc=VIFCodeDescription.any), + 0b0111_1111: VIFCode(code=0x7F, desc=VIFCodeDescription.manufacturer), + # special purpose VIF codes + 0b1111_1011: VIFCode(code=0xFB, desc=VIFCodeDescription.extension), + 0b1111_1101: VIFCode(code=0xFD, desc=VIFCodeDescription.extension), +} + + +def get_vif_code(byte: int | VIF) -> None | VIFCode: # noqa: C901 + """Return the VIFCode according to the given VIF. + + Parameters + ---------- + byte : int | VIF + either an integer or VIF class + + Raises + ------ + MBusValidationError + if byte is not within the byte range + + Returns + ------- + None | VIFCode + """ + # validate byte -> ensure VIF + byte = int(VIF(int(byte))) + + return _VIF_CODE_MAP.get(byte) diff --git a/src/pymbus/mbtypes.py b/src/pymbus/mbtypes.py index ba96974..3448947 100644 --- a/src/pymbus/mbtypes.py +++ b/src/pymbus/mbtypes.py @@ -85,7 +85,6 @@ def parse_bcd_uint(ibytes: BytesType) -> int: ------- int """ - bytez = _validate_non_empty_bytes(ibytes) msp, lsp = 0b1111_0000, 0b0000_1111 @@ -140,7 +139,6 @@ def parse_int(ibytes: BytesType) -> int: ------- int """ - bytez = _validate_non_empty_bytes(ibytes) return int.from_bytes( @@ -168,7 +166,6 @@ def parse_uint(ibytes: BytesType) -> int: ------- int """ - bytez = _validate_non_empty_bytes(ibytes) return int.from_bytes( @@ -199,7 +196,6 @@ def parse_bool(ibytes: BytesType) -> bool: ------- bool """ - return bool(parse_uint(ibytes)) @@ -227,7 +223,6 @@ def parse_float(ibytes: BytesType) -> float: ------- float """ - it = iter(ibytes) try: frame = [next(it) for _ in range(4)] @@ -250,13 +245,11 @@ class UnitType: @classmethod def from_bytes(cls, frame: BytesType) -> "UnitType": """Return a `UnitType` from an array of bytes.""" - return cls(frame) @classmethod def from_hexstring(cls, hexstr: str) -> "UnitType": """Return a `UnitType` from a hexadecimal string.""" - barr = bytearray.fromhex(hexstr) return cls.from_bytes(barr) @@ -317,7 +310,6 @@ def parse_unit_type(ibytes: BytesType) -> UnitType: ------- UnitType """ - return UnitType(ibytes) @@ -339,7 +331,6 @@ def parse_unit_type(ibytes: BytesType) -> UnitType: def get_year(lsp: int, msp: int) -> int: """Return the value of years from `lsp` and `msp` parts.""" - year_lsp = lsp & YEAR_MASK_LSB year_msp = msp & YEAR_MASK_MSB @@ -353,13 +344,11 @@ def get_year(lsp: int, msp: int) -> int: def get_month(byte: int) -> int: """Return the value of months from a byte.""" - return byte & MONTH_MASK def get_day(byte: int) -> int: """Return the value of days from a byte.""" - return byte & DAY_MASK @@ -381,7 +370,6 @@ def parse_date(ibytes: BytesType) -> date: ------- date """ - it = iter(ibytes) try: lst = [next(it) for _ in range(2)] @@ -406,20 +394,17 @@ class Date: @classmethod def from_date(cls, pydate: date) -> "Date": """Return a `Date` from a Python date.""" - return cls(year=pydate.year, month=pydate.month, day=pydate.day) @classmethod def from_bytes(cls, frame: BytesType) -> "Date": """Return a `Date` from an array of bytes.""" - pydate = parse_date(frame) return cls.from_date(pydate) @classmethod def from_hexstring(cls, hexstr: str) -> "Date": """Return a `Date` from a hexadecimal string.""" - barr = bytearray.fromhex(hexstr) return cls.from_bytes(barr) @@ -461,19 +446,16 @@ def to_iso_format(self) -> str: def get_hour(byte: int) -> int: """Return the value of hours from a byte.""" - return byte & HOUR_MASK def get_minute(byte: int) -> int: """Return the value of minutes from a byte.""" - return byte & MINUTE_MASK def get_second(byte: int) -> int: """Return the value of seconds from a byte.""" - return byte & SECOND_MASK @@ -489,7 +471,6 @@ def parse_time(ibytes: BytesType) -> time: ------- time """ - it = iter(ibytes) lst = [next(it) for _ in range(2)] @@ -542,14 +523,12 @@ def from_time(cls, pytime: time) -> "Time": @classmethod def from_bytes(cls, frame: BytesType) -> "Time": """Return a `Time` from an array of bytes.""" - pytime = parse_time(frame) return cls.from_time(pytime) @classmethod def from_hexstring(cls, hexstr: str) -> "Time": """Return a `Time` from a hexadecimal string.""" - barr = bytearray.fromhex(hexstr) return cls.from_bytes(barr) @@ -609,7 +588,6 @@ def parse_datetime(ibytes: BytesType) -> datetime: ------- datetime """ - it = iter(ibytes) try: lst = [next(it) for _ in range(4)] @@ -647,7 +625,6 @@ class DateTime: @classmethod def from_datetime(cls, pydatetime: datetime) -> "DateTime": """Return a `DateTime` from a Python datetime.""" - return cls( year=pydatetime.year, month=pydatetime.month, @@ -661,14 +638,12 @@ def from_datetime(cls, pydatetime: datetime) -> "DateTime": @classmethod def from_bytes(cls, frame: BytesType) -> "DateTime": """Return a `DateTime` from an array of bytes.""" - pydatetime = parse_datetime(frame) return cls.from_datetime(pydatetime) @classmethod def from_hexstring(cls, hexstr: str) -> "DateTime": """Return a `DateTime` from a hexadecimal string.""" - barr = bytes.fromhex(hexstr) return cls.from_bytes(barr) diff --git a/src/pymbus/structures/variable.py b/src/pymbus/structures/variable.py new file mode 100644 index 0000000..01d3c50 --- /dev/null +++ b/src/pymbus/structures/variable.py @@ -0,0 +1,30 @@ +"""M-Bus Variable structure module.""" + +from enum import IntEnum + + +# BCD = Type A. Integer = Type B. Real = Type H. +class DataFieldCode(IntEnum): + no_data = 0b0000 + int8 = 0b0001 + int16 = 0b0010 + int24 = 0b0011 + int32 = 0b0100 + real32 = 0b0101 + int48 = 0b0110 + int64 = 0b0111 + readout = 0b1000 + bcd2 = 0b1001 + bcd4 = 0b1010 + bcd6 = 0b1011 + bcd8 = 0b1100 + varlen = 0b1101 + bcd12 = 0b1110 + func = 0b1111 + + +class FunctionFieldCode(IntEnum): + instantaneous = 0b00 + maximum = 0b01 + minimum = 0b10 + error = 0b11 diff --git a/src/pymbus/telegrams/base.py b/src/pymbus/telegrams/base.py index 7e1835e..74f6659 100644 --- a/src/pymbus/telegrams/base.py +++ b/src/pymbus/telegrams/base.py @@ -7,74 +7,28 @@ - a telegram byte refers to the "int | TelegramField" """ -from collections.abc import Iterable, Iterator +import sys +from collections.abc import Iterable, Iterator, Sequence +from functools import total_ordering +from typing import Any from pymbus.exceptions import MBusValidationError +from pymbus.utils import validate_byte -def _validate_byte(number: int) -> int: - """Returns an integer if it is a byte. - - In Python, a byte must be in range(0, 256). - This is the range for an 8-bit unsigned integer. - - Parameters - ---------- - number : int - - Raises - ------ - MbusValidationError - the `number` is out of the [0, 255] segment. - - Returns - ------- - int - """ - - try: - bytes([number]) - except ValueError as e: - msg = f"{number} is not a valid byte" - raise MBusValidationError(msg) from e - - return number - - -class TelegramField: +class TelegramField(int): """The base "Field" class. - It is a base wrapper over a byte value (validation is ensured). - A Field is a part of blocks, frames and other Telegram containers. + Restricts int values to the byte range [0, 255]. + Supports all operations that the `int` class does. """ def __init__(self, byte: int) -> None: - self._byte = _validate_byte(byte) - - def __eq__(self, other: object) -> bool: - sbyte = self.byte - if isinstance(other, TelegramField): - other = other.byte - return sbyte == other - - def __lt__(self, other: "int | TelegramField") -> bool: - sbyte = self.byte - if isinstance(other, TelegramField): - other = other.byte - return sbyte < other - - def __int__(self) -> int: - return self.byte + self._byte = validate_byte(byte) def __repr__(self) -> str: cls_name = type(self).__name__ - return f"{cls_name}(byte={self.byte})" - - @property - def byte(self) -> int: - """Return the byte value of the field.""" - - return self._byte + return f"{cls_name}({self._byte})" TelegramByteType = int | TelegramField @@ -82,44 +36,32 @@ def byte(self) -> int: TelegramByteIterableType = TelegramBytesType | Iterator[TelegramByteType] -def _convert_to_telegram_fields( - ibytes: TelegramByteIterableType, -) -> list[TelegramField]: - return [ - ibyte if isinstance(ibyte, TelegramField) else TelegramField(ibyte) - for ibyte in ibytes - ] - - -class TelegramContainer: +@total_ordering +class TelegramContainer(Sequence): """The base class for Telegram containers. - A telegram container consists of telegram fields - and it is an iterable object, which may also be an iterator. - - The container accepts incoming bytes in a greedy manner. + A telegram container consists of telegram fields. + When being instantiated, the incoming bytes are consumed greedily. """ @classmethod def from_hexstring(cls, hexstr: str) -> "TelegramContainer": """Return a class instance from a hexadecimal string.""" - try: return cls(bytearray.fromhex(hexstr)) except ValueError as e: raise MBusValidationError(str(e)) from e - @classmethod - def from_integers(cls, ints: Iterable[int]) -> "TelegramContainer": - """Return a class instance from a sequence of integers.""" + def __init__( + self, + ibytes: None | TelegramByteIterableType = None, + ) -> None: + self._fields: list[TelegramField] = ( + [TelegramField(ibyte) for ibyte in ibytes] if ibytes else [] + ) - try: - return cls(bytearray(iter(ints))) - except ValueError as e: - raise MBusValidationError(str(e)) from e - - def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - self._fields = _convert_to_telegram_fields(ibytes or []) + def __contains__(self, value: Any) -> bool: + return value in self._fields def __eq__(self, other: object) -> bool: sfields = self._fields @@ -127,9 +69,7 @@ def __eq__(self, other: object) -> bool: other = other._fields return sfields == other - def __getitem__( - self, key: int | slice - ) -> "TelegramField | TelegramContainer": + def __getitem__(self, key: int | slice, /) -> Any | Sequence[Any]: if isinstance(key, int): return self._fields[key] return type(self)(self._fields[key]) @@ -140,47 +80,34 @@ def __iter__(self) -> Iterator[TelegramField]: def __len__(self) -> int: return len(self._fields) + def __lt__(self, other: Iterable) -> bool: + sfields = self._fields + if isinstance(other, TelegramContainer): + other = other._fields + return bool(sfields < list(other)) + def __repr__(self) -> str: cls_name = type(self).__name__ - return f"{cls_name}(ibytes={self._fields})" - - @staticmethod - def _iterify(data: None | TelegramBytesType = None) -> Iterator: - return iter(data or []) - - def as_bytes(self) -> bytes: - """Return bytes as Python `bytes`.""" - - return bytes(self.as_ints()) - - def as_ints(self) -> list[int]: - """Return bytes as a list of integers.""" + return f"{cls_name}({self._fields})" - return [field.byte for field in self._fields] + def __reversed__(self) -> Iterator[TelegramField]: + yield from reversed(self._fields) + def index(self, value: Any, start: int = 0, stop: None | int = None) -> int: + """Return the first index of the value. -def extract_bytes(it: Iterable) -> list[int]: - """Return the list of integers from an iterable object. + Raises + ------ + ValueError + if the value is not present - Notes - ----- - The items are validated except `TelegramField`s. - - Parameters - ---------- - it : Iterable - - Raises - ------ - MbusValidationError: - if any item is not a byte. - - Returns - ------- - list[int] - """ + Returns + ------- + int + """ + stop = sys.maxsize if stop is None else stop + return self._fields.index(TelegramField(value), start, stop) - return [ - item.byte if isinstance(item, TelegramField) else _validate_byte(item) - for item in it - ] + def count(self, value: Any) -> int: + """Return the number of occurrences of value.""" + return self._fields.count(TelegramField(value)) diff --git a/src/pymbus/telegrams/blocks.py b/src/pymbus/telegrams/blocks.py index 09918a5..4c60a8f 100644 --- a/src/pymbus/telegrams/blocks.py +++ b/src/pymbus/telegrams/blocks.py @@ -1,4 +1,4 @@ -"""M-Bus Telegram Blocks module.""" +"""M-Bus Telegram Blocks.""" from collections.abc import Iterator @@ -6,7 +6,6 @@ from pymbus.telegrams.base import ( TelegramByteIterableType, TelegramContainer, - extract_bytes, ) from pymbus.telegrams.fields import DataInformationField as DIF from pymbus.telegrams.fields import DataInformationFieldExtension as DIFE @@ -39,24 +38,28 @@ class DataInformationBlock(TelegramBlock): MAX_DIFE_FRAMES = 10 - def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - it = self._iterify(ibytes) + def __init__( + self, + ibytes: None | TelegramByteIterableType = None, + ) -> None: + it = iter(ibytes if ibytes else []) try: blocks = self._parse(it) except StopIteration as e: - msg = f"{ibytes!r} is invalid" + msg = f"{ibytes!r} has invalid length" raise MBusLengthError(msg) from e - dif = blocks[0] - difes = blocks[1] - super().__init__(ibytes=extract_bytes([dif] + difes)) + dif: DIF = blocks[0] + difes: list[DIFE] = blocks[1] + + super().__init__(ibytes=list(map(int, [dif] + difes))) # type: ignore self._dif = dif self._difes = difes def _parse(self, it: Iterator) -> tuple[DIF, list[DIFE]]: value: int = int(next(it)) - dif = DIF(byte=value) + dif = DIF(value) if not dif.extension: return (dif, []) @@ -65,7 +68,7 @@ def _parse(self, it: Iterator) -> tuple[DIF, list[DIFE]]: dife_counter = 1 while True: value = int(next(it)) - dife = DIFE(byte=value) + dife = DIFE(value) difes.append(dife) if not dife.extension: break @@ -81,13 +84,11 @@ def _parse(self, it: Iterator) -> tuple[DIF, list[DIFE]]: @property def dif(self) -> DIF: """Return the DIF field.""" - return self._dif @property def difes(self) -> list[DIFE]: """Return the list of DIFE fields.""" - return self._difes @@ -113,23 +114,23 @@ class ValueInformationBlock(TelegramBlock): MAX_VIFE_FRAMES = 10 def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - it = self._iterify(ibytes) + it = iter(ibytes if ibytes else []) try: blocks = self._parse(it) except StopIteration as e: - msg = f"{ibytes!r} is invalid" + msg = f"{ibytes!r} has invalid length" raise MBusLengthError(msg) from e vif = blocks[0] vifes = blocks[1] - super().__init__(ibytes=extract_bytes([vif] + vifes)) + super().__init__(ibytes=map(int, [vif] + vifes)) # type: ignore self._vif = vif self._vifes = vifes def _parse(self, it: Iterator) -> tuple[VIF, list[VIFE]]: value: int = int(next(it)) - vif = VIF(byte=value) + vif = VIF(value) if not vif.extension: return (vif, []) @@ -138,10 +139,11 @@ def _parse(self, it: Iterator) -> tuple[VIF, list[VIFE]]: vife_counter = 1 while True: value = int(next(it)) - vife = VIFE(byte=value) + vife = VIFE(value) vifes.append(vife) if not vife.extension: break + vife_counter += 1 if vife_counter == max_frame: if vife.extension: @@ -153,11 +155,9 @@ def _parse(self, it: Iterator) -> tuple[VIF, list[VIFE]]: @property def vif(self) -> VIF: """Return the VIF field.""" - return self._vif @property def vifes(self) -> list[VIFE]: """Return the list of VIFE fields.""" - return self._vifes diff --git a/src/pymbus/telegrams/fields.py b/src/pymbus/telegrams/fields.py deleted file mode 100644 index af43ee5..0000000 --- a/src/pymbus/telegrams/fields.py +++ /dev/null @@ -1,372 +0,0 @@ -"""M-Bus Telegram Fields module.""" - -from enum import IntEnum - -from pymbus.telegrams.base import TelegramField - -AF_UNCONFIGURED_SLAVE_BYTE = 0x00 -AF_SLAVE_MIN_RANGE_VALUE_BYTE = 0x01 -AF_SLAVE_MAX_RANGE_VALUE_BYTE = 0xFA -AF_BROADCAST_ALL_SLAVES_REPLY_BYTE = 0xFE -AF_BROADCAST_NO_SLAVE_REPLIES_BYTE = 0xFF -AF_NETWORK_LAYER_BYTE = 0xFD - - -class AddressField(TelegramField): - """The "Address (A) Field" class. - - The address field serves to address the recipient in the calling direction, - and to identify the sender of information in the receiving direction. - - The size of this field is one byte, - and can therefore take values from 0 to 255. - The addresses 1 to 250 can be allocated to the individual slaves, - up to a maximum of 250. - Unconfigured slaves are given the address 0 at manufacture, - and as a rule are allocated one of these addresses - when connected to the M-Bus. - - The addresses 254 (FEh) and 255 (FFh) are used to transmit - information to all participants (Broadcast). - With address 255, none of the slaves reply, - and with address 254 all slaves reply with their own addresses. - The latter case naturally results in collisions - when two or more slaves are connected, - and should only be used for test purposes. - - The address 253 (FDh) indicates that the adressing - has been performed in the Network Layer - instead of Data Link Layer. - - The remaining addresses 251 and 252 have been kept for future applications. - """ - - def is_configured_slave(self) -> bool: - return ( - AF_SLAVE_MIN_RANGE_VALUE_BYTE - <= self.byte - <= AF_SLAVE_MAX_RANGE_VALUE_BYTE - ) - - def is_unconfigured_slave(self) -> bool: - return self.byte == AF_UNCONFIGURED_SLAVE_BYTE - - def is_slave(self) -> bool: - return self.is_configured_slave() or self.is_unconfigured_slave() - - def is_broadcast_all_reply(self) -> bool: - return self.byte == AF_BROADCAST_ALL_SLAVES_REPLY_BYTE - - def is_broadcast_no_replies(self) -> bool: - return self.byte == AF_BROADCAST_NO_SLAVE_REPLIES_BYTE - - def is_broadcast(self) -> bool: - return self.is_broadcast_all_reply() or self.is_broadcast_no_replies() - - def is_network_layer(self) -> bool: - return self.byte == AF_NETWORK_LAYER_BYTE - - -CF_FUNCTION_CODE_MASK = 0x0F -CF_FCV_OR_DFC_MASK = 0x10 -CF_FCB_OR_ACD_MASK = 0x20 -CF_DIRECTION_MASK = 0x40 - - -class ControlField(TelegramField): - """The "Control (C) Field" class. - - The "Control Field" scheme: - ------------------------------------------------------------ - | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | - --------------------+---+- +-----+-----+----+----+----+----+ - | calling direction | 0 | 1 | FCB | FCV | F3 | F2 | F1 | F0 | - --------------------+---+---+---- +---- +----+----+----+----+ - | reply direction | 0 | 0 | ACD | DFC | F3 | F2 | F1 | F0 | - ------------------------------------------------------------- - - The highest value (most significant) bit is reserved for future functions, - and at present is allocated the value 0. - - The bit number 6 is used to specify the direction of data flow. - If 1, than it is interpreted as a calling direction, - else, a reply direction. - - For calling direction. - The bit 5 -> FCB = Frame Count Bit. - The bit 4 -> FCV = Frame Count Valid (Bit). - - The FCB indicates successful transmission procedures - in order to avoid transmission loss or multiplication. - If the expected reply is missing or reception is faulty, - the master sends again the same telegram with an identical FCB, - and the slave replies with the same telegram as previously. - The master indicates with a 1 in the FCV bit, that the FCB is used. - Otherwise, the slave should ignore the FCB. - - For reply direction: - The bit 5 -> ACD = Access Demand. - The bit 4 -> DFC = Data Flow Control. - - In the replying direction, both these bits can undertake other tasks. - The DFC serves to control the flow of data, - in that the slave with a DFC=1 - indicates that it can accept no further data. - With an ACD bit (access demand) with a value of 1, - the slave shows that it wants to transmit Class 1 data. - The master should then send it a command to request Class 1 data. - Such Class 1 data is of higher priority, - which (in contrast to Class 2 data) - should be transmitted as soon as possible. - The support of Class 1 data and the bits DFC and ADC - is not required by the standard. - - The bits 0 to 3 of the control field - code the true function or action of the message. - """ - - def __init__(self, byte: int) -> None: - super().__init__(byte) - - self._code = byte & CF_FUNCTION_CODE_MASK - self._fcv_or_dfc = int((byte & CF_FCV_OR_DFC_MASK) != 0) - self._fcb_or_acd = int((byte & CF_FCB_OR_ACD_MASK) != 0) - self._direction = int((byte & CF_DIRECTION_MASK) != 0) - - @property - def code(self) -> int: - """Return the action/function code value.""" - - return self._code - - @property - def fcb(self) -> int: - """Return the "Frame Count Bit" (FCB) value.""" - - if not self.is_calling_direction(): - msg = f'the {self} has no "Frame Count Bit" (FCB)' - raise AttributeError(msg) - return self._fcb_or_acd - - @property - def fcv(self) -> int: - """Return the "Frame Count Valid" (FCV) bit.""" - - if not self.is_calling_direction(): - msg = f'the {self} has no "Frame Count Valid" (FCV) bit' - raise AttributeError(msg) - return self._fcv_or_dfc - - @property - def acd(self) -> int: - """Return the "Access Demand" (ACD) bit.""" - - if not self.is_reply_direction(): - msg = f'the {self} has no "Access Demand" (ACD) bit' - raise AttributeError(msg) - return self._fcb_or_acd - - @property - def dfc(self) -> int: - """Return the "Data Flow Control" (DFC) value.""" - - if not self.is_reply_direction(): - msg = f'the {self} has no "Data Flow Control" (DFC) bit' - raise AttributeError(msg) - return self._fcv_or_dfc - - @property - def direction(self) -> int: - """Return the direction bit value.""" - - return self._direction - - def is_calling_direction(self) -> bool: - return self._direction == 1 - - def is_reply_direction(self) -> bool: - return self._direction == 0 - - -class ControlInformationField(TelegramField): - """The "Control Information (CI) Field" class. - - The CI-Field codes the type and sequence of application data - to be transmitted in this frame. - The EN1434-3 defines two possible data sequences in multibyte records. - The bit two (counting begins with bit 0, value 4), - which is called M bit or Mode bit, in the CI field gives - an information about the used byte sequence in multibyte data structures. - If the Mode bit is not set (Mode 1), - the least significant byte of a multibyte record is transmitted first, - otherwise (Mode 2) the most significant byte. - - The Usergroup recommends to use only the Mode 1 in future applications. - """ - - -class DataInformationField(TelegramField): - """Data Information Field (DIF) class. - - The structure of the DIF: - -------------------------------------------------------------- - | bit | 7 | 6 | 5 4 | 3 2 1 0 | - +------+-----------+--------------------+----------+---------+ - | desc | extension | storage number LSB | function | data | - -------------------------------------------------------------- - """ - - DATA_FIELD_MASK = 0x0F # 0b0000_1111 - FUNCTION_FIELD_MASK = 0x30 # 0b0011_0000 - STORAGE_NUMBER_LSB_MASK = 0x40 # 0b0100_0000 - EXTENSION_BIT_MASK = 0x80 # 0b1000_0000 - - def __init__(self, byte: int) -> None: - super().__init__(byte) - - self._data = byte & self.DATA_FIELD_MASK - self._func = (byte & self.FUNCTION_FIELD_MASK) >> 4 - self._sn_lsb = int((byte & self.STORAGE_NUMBER_LSB_MASK) != 0) - self._ext = int((byte & self.EXTENSION_BIT_MASK) != 0) - - @property - def data(self) -> int: - return self._data - - @property - def function(self) -> int: - return self._func - - @property - def storage_number_lsb(self) -> int: - return self._sn_lsb - - @property - def extension(self) -> int: - return self._ext - - -# BCD = Type A. Integer = Type B. Real = Type H. -class DataFieldCode(IntEnum): - no_data = 0b0000 - int8 = 0b0001 - int16 = 0b0010 - int24 = 0b0011 - int32 = 0b0100 - real32 = 0b0101 - int48 = 0b0110 - int64 = 0b0111 - readout = 0b1000 - bcd2 = 0b1001 - bcd4 = 0b1010 - bcd6 = 0b1011 - bcd8 = 0b1100 - varlen = 0b1101 - bcd12 = 0b1110 - special_func = 0b1111 - - -class FunctionFieldCode(IntEnum): - instantaneous = 0b00 - maximum = 0b01 - minimum = 0b10 - error = 0b11 - - -class DataInformationFieldExtension(TelegramField): - """Data Information Field Extension (DIFE) class. - - The structure of the DIFE: - ---------------------------------------------------------------- - | bit | 7 | 6 | 5 4 | 3 2 1 0 | - +------+-----------+---------------+----------+----------------+ - | desc | extension | device (unit) | tariff | storage number | - ---------------------------------------------------------------- - """ - - STORAGE_NUMBER_MASK = 0x0F # 0b0000_1111 - TARIFF_MASK = 0x30 # 0b0011_0000 - DEVICE_UNIT_MASK = 0x40 # 0b0100_0000 - EXTENSION_BIT_MASK = 0x80 # 0b1000_0000 - - def __init__(self, byte: int) -> None: - super().__init__(byte) - - self._storage_number = byte & self.STORAGE_NUMBER_MASK - self._tariff = (byte & self.TARIFF_MASK) >> 4 - self._device_unit = int((byte & self.DEVICE_UNIT_MASK) != 0) - self._ext = int((byte & self.EXTENSION_BIT_MASK) != 0) - - @property - def storage_number(self) -> int: - return self._storage_number - - @property - def tariff(self) -> int: - return self._tariff - - @property - def device_unit(self) -> int: - return self._device_unit - - @property - def extension(self) -> int: - return self._ext - - -class ValueInformationField(TelegramField): - """Value Information Field (VIF) class. - - The structure of the VIF: - -------------------------------------------------- - | bit | 7 | 6 5 4 3 2 1 0 | - +------+-----------+-----------------------------+ - | desc | extension | unit and multiplier (value) | - -------------------------------------------------- - """ - - UNIT_AND_MULTIPLIER_MASK = 0x7F # 0b0111_1111 - EXTENSION_BIT_MASK = 0x80 # 0b1000_0000 - - def __init__(self, byte: int) -> None: - super().__init__(byte) - - self._data = byte & self.UNIT_AND_MULTIPLIER_MASK - self._ext = int((byte & self.EXTENSION_BIT_MASK) != 0) - - @property - def unit(self) -> int: - return self._data - - @property - def extension(self) -> int: - return self._ext - - -class ValueInformationFieldExtension(TelegramField): - """Value Information Field Extension (VIFE) class. - - The structure of the VIFE (the same as VIF): - -------------------------------------------------- - | bit | 7 | 6 5 4 3 2 1 0 | - +------+-----------+-----------------------------+ - | desc | extension | unit and multiplier (value) | - -------------------------------------------------- - """ - - UNIT_AND_MULTIPLIER_MASK = 0x7F # 0b0111_1111 - EXTENSION_BIT_MASK = 0x80 # 0b1000_0000 - - def __init__(self, byte: int) -> None: - super().__init__(byte) - - self._data = byte & self.UNIT_AND_MULTIPLIER_MASK - self._ext = int((byte & self.EXTENSION_BIT_MASK) != 0) - - @property - def unit(self) -> int: - return self._data - - @property - def extension(self) -> int: - return self._ext diff --git a/src/pymbus/telegrams/fields/__init__.py b/src/pymbus/telegrams/fields/__init__.py new file mode 100644 index 0000000..a558b53 --- /dev/null +++ b/src/pymbus/telegrams/fields/__init__.py @@ -0,0 +1,21 @@ +from pymbus.telegrams.fields.address import AddressField +from pymbus.telegrams.fields.control import ControlField +from pymbus.telegrams.fields.control_info import ControlInformationField +from pymbus.telegrams.fields.data import ( + DataInformationField, + DataInformationFieldExtension, +) +from pymbus.telegrams.fields.value import ( + ValueInformationField, + ValueInformationFieldExtension, +) + +__all__ = [ + "AddressField", + "ControlField", + "ControlInformationField", + "DataInformationField", + "DataInformationFieldExtension", + "ValueInformationField", + "ValueInformationFieldExtension", +] diff --git a/src/pymbus/telegrams/fields/address.py b/src/pymbus/telegrams/fields/address.py new file mode 100644 index 0000000..889a4bf --- /dev/null +++ b/src/pymbus/telegrams/fields/address.py @@ -0,0 +1,65 @@ +"""M-Bus Telegram Address Field module.""" + +from pymbus.telegrams.base import TelegramField + + +class AddressField(TelegramField): + """The "Address (A) Field" class. + + The address field serves to address the recipient in the calling direction, + and to identify the sender of information in the receiving direction. + + The size of this field is one byte, + and can therefore take values from 0 to 255. + The addresses 1 to 250 can be allocated to the individual slaves, + up to a maximum of 250. + Unconfigured slaves are given the address 0 at manufacture, + and as a rule are allocated one of these addresses + when connected to the M-Bus. + + The addresses 254 (FEh) and 255 (FFh) are used to transmit + information to all participants (Broadcast). + With address 255, none of the slaves reply, + and with address 254 all slaves reply with their own addresses. + The latter case naturally results in collisions + when two or more slaves are connected, + and should only be used for test purposes. + + The address 253 (FDh) indicates that the adressing + has been performed in the Network Layer + instead of Data Link Layer. + + The remaining addresses 251 and 252 have been kept for future applications. + """ + + AF_BROADCAST_ALL_SLAVES_REPLY_BYTE = 0xFE + AF_BROADCAST_NO_SLAVE_REPLIES_BYTE = 0xFF + AF_NETWORK_LAYER_BYTE = 0xFD + AF_SLAVE_MAX_RANGE_VALUE_BYTE = 0xFA + AF_SLAVE_MIN_RANGE_VALUE_BYTE = 0x01 + AF_SLAVE_UNCONFIGURED_BYTE = 0x00 + + def is_configured_slave(self) -> bool: + return ( + self.AF_SLAVE_MIN_RANGE_VALUE_BYTE + <= self._byte + <= self.AF_SLAVE_MAX_RANGE_VALUE_BYTE + ) + + def is_unconfigured_slave(self) -> bool: + return self._byte == self.AF_SLAVE_UNCONFIGURED_BYTE + + def is_slave(self) -> bool: + return self.is_configured_slave() or self.is_unconfigured_slave() + + def is_broadcast_all_reply(self) -> bool: + return self._byte == self.AF_BROADCAST_ALL_SLAVES_REPLY_BYTE + + def is_broadcast_no_replies(self) -> bool: + return self._byte == self.AF_BROADCAST_NO_SLAVE_REPLIES_BYTE + + def is_broadcast(self) -> bool: + return self.is_broadcast_all_reply() or self.is_broadcast_no_replies() + + def is_network_layer(self) -> bool: + return self._byte == self.AF_NETWORK_LAYER_BYTE diff --git a/src/pymbus/telegrams/fields/control.py b/src/pymbus/telegrams/fields/control.py new file mode 100644 index 0000000..c1af1e0 --- /dev/null +++ b/src/pymbus/telegrams/fields/control.py @@ -0,0 +1,119 @@ +"""M-Bus Telegram Control Field module.""" + +from pymbus.telegrams.base import TelegramField + + +class ControlField(TelegramField): + """The "Control (C) Field" class. + + The "Control Field" scheme: + ------------------------------------------------------------ + | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | + --------------------+---+- +-----+-----+----+----+----+----+ + | calling direction | 0 | 1 | FCB | FCV | F3 | F2 | F1 | F0 | + --------------------+---+---+---- +---- +----+----+----+----+ + | reply direction | 0 | 0 | ACD | DFC | F3 | F2 | F1 | F0 | + ------------------------------------------------------------- + + The highest value (most significant) bit is reserved for future functions, + and at present is allocated the value 0. + + The bit number 6 is used to specify the direction of data flow. + If 1, than it is interpreted as a calling direction, + else, a reply direction. + + For calling direction. + The bit 5 -> FCB = Frame Count Bit. + The bit 4 -> FCV = Frame Count Valid (Bit). + + The FCB indicates successful transmission procedures + in order to avoid transmission loss or multiplication. + If the expected reply is missing or reception is faulty, + the master sends again the same telegram with an identical FCB, + and the slave replies with the same telegram as previously. + The master indicates with a 1 in the FCV bit, that the FCB is used. + Otherwise, the slave should ignore the FCB. + + For reply direction: + The bit 5 -> ACD = Access Demand. + The bit 4 -> DFC = Data Flow Control. + + In the replying direction, both these bits can undertake other tasks. + The DFC serves to control the flow of data, + in that the slave with a DFC=1 + indicates that it can accept no further data. + With an ACD bit (access demand) with a value of 1, + the slave shows that it wants to transmit Class 1 data. + The master should then send it a command to request Class 1 data. + Such Class 1 data is of higher priority, + which (in contrast to Class 2 data) + should be transmitted as soon as possible. + The support of Class 1 data and the bits DFC and ADC + is not required by the standard. + + The bits 0 to 3 of the control field + code the true function or action of the message. + """ + + CF_DIRECTION_MASK = 0x40 + CF_FUNCTION_CODE_MASK = 0x0F + CF_FCB_OR_ACD_MASK = 0x20 + CF_FCV_OR_DFC_MASK = 0x10 + + def __init__(self, byte: int) -> None: + super().__init__(byte) + + self._code = byte & self.CF_FUNCTION_CODE_MASK + self._fcv_or_dfc = int((byte & self.CF_FCV_OR_DFC_MASK) != 0) + self._fcb_or_acd = int((byte & self.CF_FCB_OR_ACD_MASK) != 0) + self._direction = int((byte & self.CF_DIRECTION_MASK) != 0) + + @property + def code(self) -> int: + """Return the action/function code value.""" + return self._code + + @property + def fcb(self) -> int: + """Return the "Frame Count Bit" (FCB) value.""" + if not self.is_calling_direction(): + msg = f'the {self} has no "Frame Count Bit" (FCB)' + raise AttributeError(msg) + return self._fcb_or_acd + + @property + def fcv(self) -> int: + """Return the "Frame Count Valid" (FCV) bit.""" + if not self.is_calling_direction(): + msg = f'the {self} has no "Frame Count Valid" (FCV) bit' + raise AttributeError(msg) + return self._fcv_or_dfc + + @property + def acd(self) -> int: + """Return the "Access Demand" (ACD) bit.""" + if not self.is_reply_direction(): + msg = f'the {self} has no "Access Demand" (ACD) bit' + raise AttributeError(msg) + return self._fcb_or_acd + + @property + def dfc(self) -> int: + """Return the "Data Flow Control" (DFC) value.""" + if not self.is_reply_direction(): + msg = f'the {self} has no "Data Flow Control" (DFC) bit' + raise AttributeError(msg) + return self._fcv_or_dfc + + @property + def direction(self) -> int: + """Return the direction bit value.""" + return self._direction + + def is_calling_direction(self) -> bool: + """Return True if the 6th bit is 1.""" + return bool(self._direction) + + def is_reply_direction(self) -> bool: + """Return True if the 6th bit is 0.""" + return not self._direction diff --git a/src/pymbus/telegrams/fields/control_info.py b/src/pymbus/telegrams/fields/control_info.py new file mode 100644 index 0000000..a4cb301 --- /dev/null +++ b/src/pymbus/telegrams/fields/control_info.py @@ -0,0 +1,20 @@ +"""M-Bus Telegram Control Information Field module.""" + +from pymbus.telegrams.base import TelegramField + + +class ControlInformationField(TelegramField): + """The "Control Information (CI) Field" class. + + The CI-Field codes the type and sequence of application data + to be transmitted in this frame. + The EN1434-3 defines two possible data sequences in multibyte records. + The bit two (counting begins with bit 0, value 4), + which is called M bit or Mode bit, in the CI field gives + an information about the used byte sequence in multibyte data structures. + If the Mode bit is not set (Mode 1), + the least significant byte of a multibyte record is transmitted first, + otherwise (Mode 2) the most significant byte. + + The Usergroup recommends to use only the Mode 1 in future applications. + """ diff --git a/src/pymbus/telegrams/fields/data.py b/src/pymbus/telegrams/fields/data.py new file mode 100644 index 0000000..595eefc --- /dev/null +++ b/src/pymbus/telegrams/fields/data.py @@ -0,0 +1,85 @@ +"""M-Bus Telegram Data Information Field module.""" + +from pymbus.telegrams.base import TelegramField + + +class DataInformationField(TelegramField): + """Data Information Field (DIF) class. + + The structure of the DIF: + -------------------------------------------------------------- + | bit | 7 | 6 | 5 4 | 3 2 1 0 | + +------+-----------+--------------------+----------+---------+ + | desc | extension | storage number LSB | function | data | + -------------------------------------------------------------- + """ + + DATA_FIELD_MASK = 0x0F # 0b0000_1111 + FUNCTION_FIELD_MASK = 0x30 # 0b0011_0000 + EXTENSION_BIT_MASK = 0x80 # 0b1000_0000 + STORAGE_NUMBER_LSB_MASK = 0x40 # 0b0100_0000 + + def __init__(self, byte: int) -> None: + super().__init__(byte) + + self._data = byte & self.DATA_FIELD_MASK + self._func = (byte & self.FUNCTION_FIELD_MASK) >> 4 + self._sn_lsb = int((byte & self.STORAGE_NUMBER_LSB_MASK) != 0) + self._ext = int((byte & self.EXTENSION_BIT_MASK) != 0) + + @property + def data(self) -> int: + return self._data + + @property + def function(self) -> int: + return self._func + + @property + def storage_number_lsb(self) -> int: + return self._sn_lsb + + @property + def extension(self) -> int: + return self._ext + + +class DataInformationFieldExtension(TelegramField): + """Data Information Field Extension (DIFE) class. + + The structure of the DIFE: + ---------------------------------------------------------------- + | bit | 7 | 6 | 5 4 | 3 2 1 0 | + +------+-----------+---------------+----------+----------------+ + | desc | extension | device (unit) | tariff | storage number | + ---------------------------------------------------------------- + """ + + DEVICE_UNIT_MASK = 0x40 # 0b0100_0000 + EXTENSION_BIT_MASK = 0x80 # 0b1000_0000 + STORAGE_NUMBER_MASK = 0x0F # 0b0000_1111 + TARIFF_MASK = 0x30 # 0b0011_0000 + + def __init__(self, byte: int) -> None: + super().__init__(byte) + + self._storage_number = byte & self.STORAGE_NUMBER_MASK + self._tariff = (byte & self.TARIFF_MASK) >> 4 + self._device_unit = int((byte & self.DEVICE_UNIT_MASK) != 0) + self._ext = int((byte & self.EXTENSION_BIT_MASK) != 0) + + @property + def storage_number(self) -> int: + return self._storage_number + + @property + def tariff(self) -> int: + return self._tariff + + @property + def device_unit(self) -> int: + return self._device_unit + + @property + def extension(self) -> int: + return self._ext diff --git a/src/pymbus/telegrams/fields/value.py b/src/pymbus/telegrams/fields/value.py new file mode 100644 index 0000000..0d9dbae --- /dev/null +++ b/src/pymbus/telegrams/fields/value.py @@ -0,0 +1,61 @@ +"""M-Bus Telegram Value Information Field module.""" + +from pymbus.telegrams.base import TelegramField + + +class ValueInformationField(TelegramField): + """Value Information Field (VIF) class. + + The structure of the VIF: + -------------------------------------------------- + | bit | 7 | 6 5 4 3 2 1 0 | + +------+-----------+-----------------------------+ + | desc | extension | unit and multiplier (value) | + -------------------------------------------------- + """ + + UNIT_AND_MULTIPLIER_MASK = 0x7F # 0b0111_1111 + EXTENSION_BIT_MASK = 0x80 # 0b1000_0000 + + def __init__(self, byte: int) -> None: + super().__init__(byte) + + self._data = byte & self.UNIT_AND_MULTIPLIER_MASK + self._ext = int((byte & self.EXTENSION_BIT_MASK) != 0) + + @property + def unit(self) -> int: + return self._data + + @property + def extension(self) -> int: + return self._ext + + +class ValueInformationFieldExtension(TelegramField): + """Value Information Field Extension (VIFE) class. + + The structure of the VIFE (the same as VIF): + -------------------------------------------------- + | bit | 7 | 6 5 4 3 2 1 0 | + +------+-----------+-----------------------------+ + | desc | extension | unit and multiplier (value) | + -------------------------------------------------- + """ + + UNIT_AND_MULTIPLIER_MASK = 0x7F # 0b0111_1111 + EXTENSION_BIT_MASK = 0x80 # 0b1000_0000 + + def __init__(self, byte: int) -> None: + super().__init__(byte) + + self._data = byte & self.UNIT_AND_MULTIPLIER_MASK + self._ext = int((byte & self.EXTENSION_BIT_MASK) != 0) + + @property + def unit(self) -> int: + return self._data + + @property + def extension(self) -> int: + return self._ext diff --git a/src/pymbus/telegrams/frames.py b/src/pymbus/telegrams/frames.py index 8062999..5adcd45 100644 --- a/src/pymbus/telegrams/frames.py +++ b/src/pymbus/telegrams/frames.py @@ -60,21 +60,21 @@ def from_byte(cls, byte: int) -> "SingleFrame": ------- Self """ - return SingleFrame([byte]) def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - if ibytes is None: - ibytes = [TelegramField(ACK_BYTE)] + fields = ibytes if ibytes else [TelegramField(ACK_BYTE)] - fields = list(TelegramContainer(ibytes=ibytes)) - if len(fields) != 1: - msg = f"accepts only {ACK_BYTE}" - raise MBusLengthError(msg) + it = iter(fields) + try: + field = TelegramField(int(next(it))) + except StopIteration as e: + msg = f"empty byte sequence: {ibytes!r}" + raise MBusLengthError(msg) from e - if (byte := fields[0].byte) != ACK_BYTE: - msg = f"{byte} != {ACK_BYTE}" - raise MBusValidationError(msg) + if field != ACK_BYTE: + msg = f"{int(field)} != {ACK_BYTE}" + raise MBusValidationError(msg) from None super().__init__(ibytes=fields) @@ -99,16 +99,16 @@ class ShortFrame(TelegramFrame): """ def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - it = iter(ibytes) # type: ignore [arg-type] + it = iter(ibytes if ibytes else []) try: super().__init__(self._parse(it)) except StopIteration as e: - msg = f"{ibytes!r} are of invalid length" + msg = f"{ibytes!r} has an invalid length" raise MBusLengthError(msg) from e def _parse(self, it: Iterator) -> list[TelegramField]: start_field = TelegramField(int(next(it))) - if (byte := start_field.byte) != SHORT_FRAME_START_BYTE: + if (byte := start_field) != SHORT_FRAME_START_BYTE: msg = f"the first byte {byte!r} is an invalid start byte" raise MBusValidationError(msg) @@ -117,7 +117,7 @@ def _parse(self, it: Iterator) -> list[TelegramField]: check_sum_field = TelegramField(int(next(it))) stop_field = TelegramField(int(next(it))) - if (byte := stop_field.byte) != FRAME_STOP_BYTE: + if (byte := stop_field) != FRAME_STOP_BYTE: msg = f"the fifth byte {byte!r} is an invalid stop byte" raise MBusValidationError(msg) @@ -151,16 +151,16 @@ class ControlFrame(TelegramFrame): """ def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - it = iter(ibytes) # type: ignore [arg-type] + it = iter(ibytes if ibytes else []) try: super().__init__(self._parse(it)) except StopIteration as e: - msg = f"{ibytes!r} are of invalid length" + msg = f"{ibytes!r} has an invalid length" raise MBusLengthError(msg) from e def _parse(self, it: Iterator) -> list[TelegramField]: start_field = TelegramField(int(next(it))) - if (byte := start_field.byte) != CONTROL_FRAME_START_BYTE: + if (byte := start_field) != CONTROL_FRAME_START_BYTE: msg = f"the first byte {byte!r} is invalid start byte" raise MBusValidationError(msg) @@ -168,7 +168,7 @@ def _parse(self, it: Iterator) -> list[TelegramField]: length2_field = TelegramField(int(next(it))) start2_field = TelegramField(int(next(it))) - if (byte := start2_field.byte) != CONTROL_FRAME_START_BYTE: + if (byte := start2_field) != CONTROL_FRAME_START_BYTE: msg = f"the fourth byte {byte!r} is invalid start byte" raise MBusValidationError(msg) @@ -178,7 +178,7 @@ def _parse(self, it: Iterator) -> list[TelegramField]: check_sum_field = TelegramField(int(next(it))) stop_field = TelegramField(int(next(it))) - if (byte := stop_field.byte) != FRAME_STOP_BYTE: + if (byte := stop_field) != FRAME_STOP_BYTE: msg = f"the ninth byte {byte!r} is invalid stop byte" raise MBusValidationError(msg) @@ -223,16 +223,16 @@ class LongFrame(TelegramFrame): """ def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - it = iter(ibytes) # type: ignore [arg-type] + it = iter(ibytes if ibytes else []) try: super().__init__(self._parse(it)) except StopIteration as e: - msg = f"{ibytes!r} are of invalid length" + msg = f"{ibytes!r} has an invalid length" raise MBusLengthError(msg) from e def _parse(self, it: Iterator) -> list[TelegramField]: start_field = TelegramField(int(next(it))) - if (byte := start_field.byte) != CONTROL_FRAME_START_BYTE: + if (byte := start_field) != CONTROL_FRAME_START_BYTE: msg = f"the first byte {byte!r} is invalid start byte" raise MBusValidationError(msg) @@ -240,7 +240,7 @@ def _parse(self, it: Iterator) -> list[TelegramField]: length2_field = TelegramField(int(next(it))) start2_field = TelegramField(int(next(it))) - if (byte := start2_field.byte) != CONTROL_FRAME_START_BYTE: + if (byte := start2_field) != CONTROL_FRAME_START_BYTE: msg = f"the fourth byte {byte!r} is invalid start byte" raise MBusValidationError(msg) @@ -256,7 +256,7 @@ def _parse(self, it: Iterator) -> list[TelegramField]: check_sum_field = TelegramField(int(next(it))) stop_field = TelegramField(int(next(it))) - if (byte := stop_field.byte) != FRAME_STOP_BYTE: + if (byte := stop_field) != FRAME_STOP_BYTE: msg = f"the tenth byte {byte!r} is invalid stop byte" raise MBusValidationError(msg) diff --git a/src/pymbus/telegrams/records.py b/src/pymbus/telegrams/records.py index c63dd6f..7794ad0 100644 --- a/src/pymbus/telegrams/records.py +++ b/src/pymbus/telegrams/records.py @@ -22,7 +22,7 @@ class DataRecordHeader(TelegramContainer): """ def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - it = iter(ibytes) # type: ignore [arg-type] + it = iter(ibytes if ibytes else []) dib = DIB(ibytes=it) vib = VIB(ibytes=it) @@ -35,13 +35,11 @@ def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: @property def dib(self) -> DIB: """Return DI block.""" - return self._dib @property def vib(self) -> VIB: """Return VI block.""" - return self._vib @@ -58,7 +56,7 @@ class DataRecord(TelegramContainer): """ def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: - it = iter(ibytes) # type: ignore [arg-type] + it = iter(ibytes if ibytes else []) drh = DataRecordHeader(ibytes=it) data = TelegramContainer(ibytes=it) @@ -71,11 +69,9 @@ def __init__(self, ibytes: None | TelegramByteIterableType = None) -> None: @property def drh(self) -> DataRecordHeader: """Return DataRecordHeader.""" - return self._drh @property def data(self) -> TelegramContainer: """Return data fields.""" - return self._data diff --git a/src/pymbus/utils.py b/src/pymbus/utils.py new file mode 100644 index 0000000..b8ce0ff --- /dev/null +++ b/src/pymbus/utils.py @@ -0,0 +1,29 @@ +"""Pymbus utilities.""" + +from pymbus.exceptions import MBusValidationError + + +def validate_byte(number: int) -> int: + """Return an integer if it is a byte. + + In Python, a byte must be in range(0, 256). + This is the range for an 8-bit unsigned integer. + + Parameters + ---------- + number : int + + Raises + ------ + MbusValidationError + the `number` is out of the [0, 255] segment. + + Returns + ------- + int + """ + if -1 < number < 256: + return number + + msg = f"{number} is not a valid byte" + raise MBusValidationError(msg) from None diff --git a/tests/codes/test_vif_codes.py b/tests/codes/test_vif_codes.py index fe4fab3..7dd29b8 100644 --- a/tests/codes/test_vif_codes.py +++ b/tests/codes/test_vif_codes.py @@ -1,264 +1,649 @@ import pytest -from pymbus.codes.value_info import ( - EnergyJouleVIFCode, - EnergyWattHourVIFCode, - MassKilogramVIFCode, - OnTimeVIFCode, - OperatingTimeVIFCode, - PowerJoulePerHourVIFCode, - PowerWattVIFCode, - VolumeFlowCubicMeterPerHourVIFCode, - VolumeFlowCubicMeterPerMinuteVIFCode, - VolumeFlowCubicMeterPerSecondVIFCode, - VolumeMeterCubeVIFCode, +from pymbus.codes.vif import ( + VIFCode, + VIFCodeDescription, + VIFCodeUnit, get_vif_code, ) -from pymbus.codes.value_info import ( - ValueInformationFieldCode as VIFC, -) +from pymbus.exceptions import MBusValidationError from pymbus.telegrams.fields import ValueInformationField as VIF -def _assert_vif_code( - vif: VIF, code_type: VIFC | None, multiplier: float -) -> None: - res = get_vif_code(vif) - if res is None: - msg = f"no match for {vif}" - raise ValueError(msg) +def test_bad_nonbyte_value(): + with pytest.raises(MBusValidationError): + get_vif_code(266) + - assert type(res) is code_type - assert res.multiplier == multiplier +def test_no_vif_code(): + assert get_vif_code(VIF(0b0111_1011)) is None @pytest.mark.parametrize( - ("vif", "code_type", "multiplier"), + ("vif", "coef", "desc", "unit"), [ + # Energy (Watt * hour) ( VIF(0b0000_0000), - EnergyWattHourVIFCode, 1e-3, + VIFCodeDescription.energy, + VIFCodeUnit.watt_hour, ), ( - VIF(0b0000_0111), - EnergyWattHourVIFCode, - 1e4, - ), - ( - VIF(0b1000_0000), - EnergyWattHourVIFCode, - 1e-3, + VIF(0b0000_0001), + 1e-2, + VIFCodeDescription.energy, + VIFCodeUnit.watt_hour, ), ( - VIF(0b1000_0111), - EnergyWattHourVIFCode, - 1e4, + VIF(0b0000_0010), + 1e-1, + VIFCodeDescription.energy, + VIFCodeUnit.watt_hour, ), ( - VIF(0b0000_1000), - EnergyJouleVIFCode, + VIF(0b0000_0011), 1e0, + VIFCodeDescription.energy, + VIFCodeUnit.watt_hour, ), ( - VIF(0b0000_1111), - EnergyJouleVIFCode, - 1e7, + VIF(0b0000_0100), + 1e1, + VIFCodeDescription.energy, + VIFCodeUnit.watt_hour, ), ( - VIF(0b1000_1000), - EnergyJouleVIFCode, - 1e0, + VIF(0b0000_0101), + 1e2, + VIFCodeDescription.energy, + VIFCodeUnit.watt_hour, ), ( - VIF(0b1000_1111), - EnergyJouleVIFCode, - 1e7, + VIF(0b0000_0110), + 1e3, + VIFCodeDescription.energy, + VIFCodeUnit.watt_hour, ), - ], -) -def test_energy_vifcodes(vif: VIF, code_type: VIFC | None, multiplier: float): - _assert_vif_code(vif, code_type, multiplier) - - -@pytest.mark.parametrize( - ("vif", "code_type", "multiplier"), - [ + ( + VIF(0b0000_0111), + 1e4, + VIFCodeDescription.energy, + VIFCodeUnit.watt_hour, + ), + # Energy (Joule) + (VIF(0b0000_1000), 1e0, VIFCodeDescription.energy, VIFCodeUnit.joule), + (VIF(0b0000_1001), 1e1, VIFCodeDescription.energy, VIFCodeUnit.joule), + (VIF(0b0000_1010), 1e2, VIFCodeDescription.energy, VIFCodeUnit.joule), + (VIF(0b0000_1011), 1e3, VIFCodeDescription.energy, VIFCodeUnit.joule), + (VIF(0b0000_1100), 1e4, VIFCodeDescription.energy, VIFCodeUnit.joule), + (VIF(0b0000_1101), 1e5, VIFCodeDescription.energy, VIFCodeUnit.joule), + (VIF(0b0000_1110), 1e6, VIFCodeDescription.energy, VIFCodeUnit.joule), + (VIF(0b0000_1111), 1e7, VIFCodeDescription.energy, VIFCodeUnit.joule), + # Volume (Meter cubic) ( VIF(0b0001_0000), - VolumeMeterCubeVIFCode, 1e-6, + VIFCodeDescription.volume, + VIFCodeUnit.meter_cubic, ), ( - VIF(0b0001_0111), - VolumeMeterCubeVIFCode, - 1e1, + VIF(0b0001_0001), + 1e-5, + VIFCodeDescription.volume, + VIFCodeUnit.meter_cubic, ), ( - VIF(0b1001_0000), - VolumeMeterCubeVIFCode, - 1e-6, - ), - ( - VIF(0b1001_0111), - VolumeMeterCubeVIFCode, - 1e1, + VIF(0b0001_0010), + 1e-4, + VIFCodeDescription.volume, + VIFCodeUnit.meter_cubic, ), - ], -) -def test_volume_vifcodes(vif: VIF, code_type: VIFC | None, multiplier: float): - _assert_vif_code(vif, code_type, multiplier) - - -@pytest.mark.parametrize( - ("vif", "code_type", "multiplier"), - [ ( - VIF(0b0001_1000), - MassKilogramVIFCode, + VIF(0b0001_0011), 1e-3, + VIFCodeDescription.volume, + VIFCodeUnit.meter_cubic, ), ( - VIF(0b0001_1111), - MassKilogramVIFCode, - 1e4, - ), - ( - VIF(0b1001_1000), - MassKilogramVIFCode, - 1e-3, + VIF(0b0001_0100), + 1e-2, + VIFCodeDescription.volume, + VIFCodeUnit.meter_cubic, ), ( - VIF(0b1001_1111), - MassKilogramVIFCode, - 1e4, + VIF(0b0001_0101), + 1e-1, + VIFCodeDescription.volume, + VIFCodeUnit.meter_cubic, ), - ], -) -def test_mass_vifcodes(vif: VIF, code_type: VIFC | None, multiplier: float): - _assert_vif_code(vif, code_type, multiplier) - - -@pytest.mark.parametrize( - ("vif", "code_type", "unit"), - [ ( - VIF(0b0010_0000), - OnTimeVIFCode, - "second", + VIF(0b0001_0110), + 1e0, + VIFCodeDescription.volume, + VIFCodeUnit.meter_cubic, ), ( - VIF(0b0010_0001), - OnTimeVIFCode, - "minute", - ), + VIF(0b0001_0111), + 1e1, + VIFCodeDescription.volume, + VIFCodeUnit.meter_cubic, + ), + # Mass (Kilogram) + (VIF(0b0001_1000), 1e-3, VIFCodeDescription.mass, VIFCodeUnit.kilogram), + (VIF(0b0001_1001), 1e-2, VIFCodeDescription.mass, VIFCodeUnit.kilogram), + (VIF(0b0001_1010), 1e-1, VIFCodeDescription.mass, VIFCodeUnit.kilogram), + (VIF(0b0001_1011), 1e0, VIFCodeDescription.mass, VIFCodeUnit.kilogram), + (VIF(0b0001_1100), 1e1, VIFCodeDescription.mass, VIFCodeUnit.kilogram), + (VIF(0b0001_1101), 1e2, VIFCodeDescription.mass, VIFCodeUnit.kilogram), + (VIF(0b0001_1110), 1e3, VIFCodeDescription.mass, VIFCodeUnit.kilogram), + (VIF(0b0001_1111), 1e4, VIFCodeDescription.mass, VIFCodeUnit.kilogram), + # On Time (time parts -> days, hours, minutes, seconds) + (VIF(0b0010_0000), 1, VIFCodeDescription.on_time, VIFCodeUnit.second), + (VIF(0b0010_0001), 60, VIFCodeDescription.on_time, VIFCodeUnit.second), ( VIF(0b0010_0010), - OnTimeVIFCode, - "hour", + 3600, + VIFCodeDescription.on_time, + VIFCodeUnit.second, ), ( VIF(0b0010_0011), - OnTimeVIFCode, - "day", + 86400, + VIFCodeDescription.on_time, + VIFCodeUnit.second, ), + # Operating Time (like On Time) ( VIF(0b0010_0100), - OperatingTimeVIFCode, - "second", + 1, + VIFCodeDescription.operating_time, + VIFCodeUnit.second, ), ( VIF(0b0010_0101), - OperatingTimeVIFCode, - "minute", + 60, + VIFCodeDescription.operating_time, + VIFCodeUnit.second, ), ( VIF(0b0010_0110), - OperatingTimeVIFCode, - "hour", + 3600, + VIFCodeDescription.operating_time, + VIFCodeUnit.second, ), ( VIF(0b0010_0111), - OperatingTimeVIFCode, - "day", + 86400, + VIFCodeDescription.operating_time, + VIFCodeUnit.second, + ), + # Power (Watt) + (VIF(0b0010_1000), 1e-3, VIFCodeDescription.power, VIFCodeUnit.watt), + (VIF(0b0010_1001), 1e-2, VIFCodeDescription.power, VIFCodeUnit.watt), + (VIF(0b0010_1010), 1e-1, VIFCodeDescription.power, VIFCodeUnit.watt), + (VIF(0b0010_1011), 1e0, VIFCodeDescription.power, VIFCodeUnit.watt), + (VIF(0b0010_1100), 1e1, VIFCodeDescription.power, VIFCodeUnit.watt), + (VIF(0b0010_1101), 1e2, VIFCodeDescription.power, VIFCodeUnit.watt), + (VIF(0b0010_1110), 1e3, VIFCodeDescription.power, VIFCodeUnit.watt), + (VIF(0b0010_1111), 1e4, VIFCodeDescription.power, VIFCodeUnit.watt), + # Power (Joule/hour) + ( + VIF(0b0011_0000), + 1e0, + VIFCodeDescription.power, + VIFCodeUnit.joule_per_hour, ), - ], -) -def test_ontime_vifcodes(vif: VIF, code_type: VIFC | None, unit: str): - res = get_vif_code(vif) - if res is None: - msg = f"no match for {vif}" - raise ValueError(msg) - - assert type(res) is code_type - assert unit == res.UNIT - - -@pytest.mark.parametrize( - ("vif", "code_type", "multiplier"), - [ ( - VIF(0b0010_1000), - PowerWattVIFCode, - 1e-3, + VIF(0b0011_0001), + 1e1, + VIFCodeDescription.power, + VIFCodeUnit.joule_per_hour, ), ( - VIF(0b0010_1111), - PowerWattVIFCode, + VIF(0b0011_0010), + 1e2, + VIFCodeDescription.power, + VIFCodeUnit.joule_per_hour, + ), + ( + VIF(0b0011_0011), + 1e3, + VIFCodeDescription.power, + VIFCodeUnit.joule_per_hour, + ), + ( + VIF(0b0011_0100), 1e4, + VIFCodeDescription.power, + VIFCodeUnit.joule_per_hour, ), ( - VIF(0b0011_0000), - PowerJoulePerHourVIFCode, - 1, + VIF(0b0011_0101), + 1e5, + VIFCodeDescription.power, + VIFCodeUnit.joule_per_hour, + ), + ( + VIF(0b0011_0110), + 1e6, + VIFCodeDescription.power, + VIFCodeUnit.joule_per_hour, ), ( VIF(0b0011_0111), - PowerJoulePerHourVIFCode, 1e7, + VIFCodeDescription.power, + VIFCodeUnit.joule_per_hour, ), - ], -) -def test_power_vifcodes(vif: VIF, code_type: VIFC | None, multiplier: float): - _assert_vif_code(vif, code_type, multiplier) - - -@pytest.mark.parametrize( - ("vif", "code_type", "multiplier"), - [ + # Volume flow (m^3/hour) ( VIF(0b0011_1000), - VolumeFlowCubicMeterPerHourVIFCode, 1e-6, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_hour, + ), + ( + VIF(0b0011_1001), + 1e-5, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_hour, + ), + ( + VIF(0b0011_1010), + 1e-4, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_hour, + ), + ( + VIF(0b0011_1011), + 1e-3, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_hour, + ), + ( + VIF(0b0011_1100), + 1e-2, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_hour, + ), + ( + VIF(0b0011_1101), + 1e-1, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_hour, + ), + ( + VIF(0b0011_1110), + 1e0, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_hour, ), ( VIF(0b0011_1111), - VolumeFlowCubicMeterPerHourVIFCode, - 10, + 1e1, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_hour, ), + # Volume flow (m^3/min) ( VIF(0b0100_0000), - VolumeFlowCubicMeterPerMinuteVIFCode, 1e-7, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_minute, + ), + ( + VIF(0b0100_0001), + 1e-6, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_minute, + ), + ( + VIF(0b0100_0010), + 1e-5, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_minute, + ), + ( + VIF(0b0100_0011), + 1e-4, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_minute, + ), + ( + VIF(0b0100_0100), + 1e-3, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_minute, + ), + ( + VIF(0b0100_0101), + 1e-2, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_minute, + ), + ( + VIF(0b0100_0110), + 1e-1, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_minute, ), ( VIF(0b0100_0111), - VolumeFlowCubicMeterPerMinuteVIFCode, - 1, + 1e0, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_minute, ), + # Volume flow (m^3/min) ( VIF(0b0100_1000), - VolumeFlowCubicMeterPerSecondVIFCode, 1e-9, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_second, + ), + ( + VIF(0b0100_1001), + 1e-8, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_second, + ), + ( + VIF(0b0100_1010), + 1e-7, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_second, + ), + ( + VIF(0b0100_1011), + 1e-6, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_second, + ), + ( + VIF(0b0100_1100), + 1e-5, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_second, + ), + ( + VIF(0b0100_1101), + 1e-4, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_second, + ), + ( + VIF(0b0100_1110), + 1e-3, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_second, ), ( VIF(0b0100_1111), - VolumeFlowCubicMeterPerSecondVIFCode, 1e-2, + VIFCodeDescription.volume_flow, + VIFCodeUnit.meter_cubic_per_second, + ), + # Mass flow (kg/h) + ( + VIF(0b0101_0000), + 1e-3, + VIFCodeDescription.mass_flow, + VIFCodeUnit.kilogram_per_hour, + ), + ( + VIF(0b0101_0001), + 1e-2, + VIFCodeDescription.mass_flow, + VIFCodeUnit.kilogram_per_hour, + ), + ( + VIF(0b0101_0010), + 1e-1, + VIFCodeDescription.mass_flow, + VIFCodeUnit.kilogram_per_hour, + ), + ( + VIF(0b0101_0011), + 1e0, + VIFCodeDescription.mass_flow, + VIFCodeUnit.kilogram_per_hour, + ), + ( + VIF(0b0101_0100), + 1e1, + VIFCodeDescription.mass_flow, + VIFCodeUnit.kilogram_per_hour, + ), + ( + VIF(0b0101_0101), + 1e2, + VIFCodeDescription.mass_flow, + VIFCodeUnit.kilogram_per_hour, + ), + ( + VIF(0b0101_0110), + 1e3, + VIFCodeDescription.mass_flow, + VIFCodeUnit.kilogram_per_hour, + ), + ( + VIF(0b0101_0111), + 1e4, + VIFCodeDescription.mass_flow, + VIFCodeUnit.kilogram_per_hour, + ), + # Flow Temperature (C) + ( + VIF(0b0101_1000), + 1e-3, + VIFCodeDescription.flow_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0101_1001), + 1e-2, + VIFCodeDescription.flow_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0101_1010), + 1e-1, + VIFCodeDescription.flow_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0101_1011), + 1e0, + VIFCodeDescription.flow_temp, + VIFCodeUnit.celsius, + ), + # Return Temperature (C) + ( + VIF(0b0101_1100), + 1e-3, + VIFCodeDescription.return_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0101_1101), + 1e-2, + VIFCodeDescription.return_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0101_1110), + 1e-1, + VIFCodeDescription.return_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0101_1111), + 1e0, + VIFCodeDescription.return_temp, + VIFCodeUnit.celsius, + ), + # Temperature Difference (K) + ( + VIF(0b0110_0000), + 1e-3, + VIFCodeDescription.temp_difference, + VIFCodeUnit.kelvin, + ), + ( + VIF(0b0110_0001), + 1e-2, + VIFCodeDescription.temp_difference, + VIFCodeUnit.kelvin, + ), + ( + VIF(0b0110_0010), + 1e-1, + VIFCodeDescription.temp_difference, + VIFCodeUnit.kelvin, + ), + ( + VIF(0b0110_0011), + 1e-0, + VIFCodeDescription.temp_difference, + VIFCodeUnit.kelvin, + ), + # External Temperature (C) + ( + VIF(0b0110_0100), + 1e-3, + VIFCodeDescription.external_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0110_0101), + 1e-2, + VIFCodeDescription.external_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0110_0110), + 1e-1, + VIFCodeDescription.external_temp, + VIFCodeUnit.celsius, + ), + ( + VIF(0b0110_0111), + 1e-0, + VIFCodeDescription.external_temp, + VIFCodeUnit.celsius, + ), + # Pressure (bar) + (VIF(0b0110_1000), 1e-3, VIFCodeDescription.pressure, VIFCodeUnit.bar), + (VIF(0b0110_1001), 1e-2, VIFCodeDescription.pressure, VIFCodeUnit.bar), + (VIF(0b0110_1010), 1e-1, VIFCodeDescription.pressure, VIFCodeUnit.bar), + (VIF(0b0110_1011), 1e0, VIFCodeDescription.pressure, VIFCodeUnit.bar), + # TIme Point + (VIF(0b0110_1100), 1, VIFCodeDescription.time_point, VIFCodeUnit.date), + ( + VIF(0b0110_1101), + 1, + VIFCodeDescription.time_point, + VIFCodeUnit.datetime, + ), + # H.C.A. = Heat Cost Allocator + (VIF(0b0110_1110), 1, VIFCodeDescription.hca, VIFCodeUnit.hca), + # Reserved + (VIF(0b0110_1111), 1, VIFCodeDescription.reserved, VIFCodeUnit.unknown), + # Averaging duration (in seconds) + ( + VIF(0b0111_0000), + 1, + VIFCodeDescription.averaging_duration, + VIFCodeUnit.second, + ), + ( + VIF(0b0111_0001), + 60, + VIFCodeDescription.averaging_duration, + VIFCodeUnit.second, + ), + ( + VIF(0b0111_0010), + 3600, + VIFCodeDescription.averaging_duration, + VIFCodeUnit.second, + ), + ( + VIF(0b0111_0011), + 86400, + VIFCodeDescription.averaging_duration, + VIFCodeUnit.second, + ), + # Actuality duration (in seconds) + ( + VIF(0b0111_0100), + 1, + VIFCodeDescription.actuality_duration, + VIFCodeUnit.second, + ), + ( + VIF(0b0111_0101), + 60, + VIFCodeDescription.actuality_duration, + VIFCodeUnit.second, + ), + ( + VIF(0b0111_0110), + 3600, + VIFCodeDescription.actuality_duration, + VIFCodeUnit.second, + ), + ( + VIF(0b0111_0111), + 86400, + VIFCodeDescription.actuality_duration, + VIFCodeUnit.second, + ), + # Fabrication No + ( + VIF(0b0111_1000), + 1, + VIFCodeDescription.fabrication_no, + VIFCodeUnit.unknown, + ), + # Enhanced + (VIF(0b0111_1001), 1, VIFCodeDescription.enhanced, VIFCodeUnit.unknown), + # Bus address + ( + VIF(0b0111_1010), + 1, + VIFCodeDescription.bus_address, + VIFCodeUnit.unknown, + ), + # special purpose + (VIF(0b0111_1100), 1, VIFCodeDescription.user, VIFCodeUnit.unknown), + (VIF(0b0111_1110), 1, VIFCodeDescription.any, VIFCodeUnit.unknown), + ( + VIF(0b0111_1111), + 1, + VIFCodeDescription.manufacturer, + VIFCodeUnit.unknown, + ), + ( + VIF(0b1111_1011), + 1, + VIFCodeDescription.extension, + VIFCodeUnit.unknown, + ), + ( + VIF(0b1111_1101), + 1, + VIFCodeDescription.extension, + VIFCodeUnit.unknown, ), ], ) -def test_volume_flow_vifcodes( - vif: VIF, code_type: VIFC | None, multiplier: float +def test_vif_codes( + vif: VIF, coef: float, desc: VIFCodeDescription, unit: VIFCodeUnit ): - _assert_vif_code(vif, code_type, multiplier) + assert get_vif_code(vif) == VIFCode( + code=int(vif), + coef=coef, + desc=desc, + unit=unit, + ) diff --git a/tests/telegrams/test_base.py b/tests/telegrams/test_base.py index 7b2a0b3..7b58020 100644 --- a/tests/telegrams/test_base.py +++ b/tests/telegrams/test_base.py @@ -1,14 +1,15 @@ +from collections.abc import Iterable from contextlib import AbstractContextManager from contextlib import nullcontext as does_not_raise +from operator import and_, or_, xor +from typing import Any import pytest -from pymbus.exceptions import MBusError +from pymbus.exceptions import MBusValidationError from pymbus.telegrams.base import ( - TelegramBytesType, TelegramContainer, TelegramField, - extract_bytes, ) @@ -16,36 +17,32 @@ class TestTelegramField: @pytest.mark.parametrize( ("nbr", "expectation"), [ - (-1, pytest.raises(MBusError)), + (-1, pytest.raises(MBusValidationError)), (0, does_not_raise()), (128, does_not_raise()), (255, does_not_raise()), - (256, pytest.raises(MBusError)), + (256, pytest.raises(MBusValidationError)), ], ) def test_init(self, nbr: int, expectation: AbstractContextManager): with expectation: TelegramField(nbr) - def test_equality(self): - nbr = 5 - tf = TelegramField(nbr) - - assert tf == TelegramField(nbr) - assert tf == nbr - assert tf != "5" - - non_nbr = nbr + 1 - assert tf != TelegramField(non_nbr) - assert tf != non_nbr + def test_is_int(self): + assert isinstance(TelegramField(4), int) - def test_byte_property(self): - nbr = 128 - tf = TelegramField(nbr) + def test_comparison_ops(self): + nbr = 21 + tf = TelegramField(nbr + 1) - assert tf.byte == nbr + assert TelegramField(nbr) == nbr + assert tf != nbr + assert tf > nbr + assert tf >= nbr + assert nbr < tf + assert nbr <= tf - def test_int_conversion(self): + def test_to_int(self): nbr = 21 result = int(TelegramField(nbr)) @@ -53,16 +50,61 @@ def test_int_conversion(self): assert isinstance(result, int) assert result == nbr + def test_bitwise_ops(self): + nbr = 42 + tf = TelegramField(nbr) -class TestTelegramContainer: - def test_init_from(self): - hexstr = "00 FF" - ints = [0, 255] + for op in (and_, or_, xor): + assert op(tf, 21) == op(nbr, 21) + + assert ~tf == ~nbr + + assert (tf << 2) == (nbr << 2) + assert (tf >> 1) == (nbr >> 1) - tf1 = TelegramContainer.from_hexstring(hexstr) - tf2 = TelegramContainer.from_integers(ints) + def test_repr(self): + tf = TelegramField(255) + assert repr(tf) == "TelegramField(255)" - assert tf1 == tf2 + +class TestTelegramContainer: + @pytest.mark.parametrize( + ("hexstr", "answer", "expectation"), + [ + ("", [], does_not_raise()), + ("00 80 FF", [0, 128, 255], does_not_raise()), + ("XY", None, pytest.raises(MBusValidationError)), + ], + ) + def test_from_hexstring( + self, + hexstr: str, + answer: None | Iterable[int], + expectation: AbstractContextManager, + ): + with expectation: + tc = TelegramContainer.from_hexstring(hexstr) + assert tc == answer + + def test_init(self): + ctx = does_not_raise() + nums = [0, 128, 255] + + with ctx: + assert TelegramContainer(bytes(nums)) + with ctx: + assert TelegramContainer(bytearray(nums)) + with ctx: + assert TelegramContainer(nums) + with ctx: + assert TelegramContainer(TelegramField(num) for num in nums) + + @pytest.mark.parametrize( + ("it", "value", "answer"), + [([], 0, False), ([1], 1, True), ([2], 3, False), ([4, 5], 5, True)], + ) + def test_contains(self, it: Iterable, value: Any, answer: bool): + assert (value in TelegramContainer(it)) == answer @pytest.mark.parametrize( ("container", "key", "answer"), @@ -94,26 +136,59 @@ def test_getitem( ): assert container[key] == answer - def test_as_bytes(self): - tc = TelegramContainer.from_integers([0, 12, 23, 66]) + def test_comparison_ops(self): + assert TelegramContainer([2, 1]) == [2, 1] + assert TelegramContainer([2, 1]) != [1, 2] + assert TelegramContainer([1, 2]) < [2, 1] + assert TelegramContainer([1, 2]) <= [2, 1] + assert TelegramContainer([2, 1]) > [1] + assert TelegramContainer([2, 1]) >= [1] - bytez = bytes(tf.byte for tf in tc) + @pytest.mark.parametrize("it", [[], [1], [3, 2]]) + def test_reversed(self, it: list[int]): + tc = TelegramContainer(it) - assert tc.as_bytes() == bytez + assert tuple(reversed(tc)) == tuple(reversed(it)) - def test_as_ints(self): - ints = [0, 1, 2] - - tc = TelegramContainer(ints) + @pytest.mark.parametrize( + ("it", "value", "start", "stop", "expectation"), + [ + ([], 0, 0, 0, pytest.raises(ValueError)), + ([1, 2], 2, 0, 2, does_not_raise()), + ([1, 2], 2, 1, 2, does_not_raise()), + ([1, 2], 2, 0, 1, pytest.raises(ValueError)), + ([1, 2], 1, 1, 2, pytest.raises(ValueError)), + ], + ) + def test_index( + self, + it: Iterable, + value: Any, + *, + start: int, + stop: int, + expectation: AbstractContextManager, + ): + res, ans = None, None + seq = list(it) - assert tc.as_ints() == ints + with expectation: + res = TelegramContainer(seq).index(value, start, stop) + with expectation: + ans = seq.index(value, start, stop) -@pytest.mark.parametrize( - ("it", "answer"), - [([], []), ([0, TelegramField(byte=42), 0b1111_1111], [0, 42, 255])], -) -def test_extract_bytes(it: TelegramBytesType, answer: list[int]) -> None: - bytez = extract_bytes(it) + assert res == ans - assert bytez == answer + @pytest.mark.parametrize( + ("it", "value", "answer"), + [ + ([], 0, 0), + ([1], 0, 0), + ([2, 3], 2, 1), + ([1, 2, 1, 3], 1, 2), + ([3, 4, 1], 1, 1), + ], + ) + def test_count(self, it: Iterable, value: Any, answer: Any): + assert TelegramContainer(it).count(value) == answer diff --git a/tests/telegrams/test_blocks.py b/tests/telegrams/test_blocks.py index bb4fdff..3259937 100644 --- a/tests/telegrams/test_blocks.py +++ b/tests/telegrams/test_blocks.py @@ -13,21 +13,6 @@ class TestDIB: - @pytest.mark.parametrize( - ("ints", "expectation"), - [ - ([-1], pytest.raises(MBusValidationError)), - ([256], pytest.raises(MBusValidationError)), - ([0b0111_0000], does_not_raise()), - ([0b1000_1111, 0b0111_0000], does_not_raise()), - ], - ) - def test_init_from_integers( - self, ints: list[int], expectation: AbstractContextManager - ): - with expectation: - DIB.from_integers(ints) - @pytest.mark.parametrize( ("hexstr", "expectation"), [ @@ -91,7 +76,7 @@ def test_iterability(self): dib = DIB(it) for df, byte in zip(dib, it, strict=True): - assert df.byte == byte + assert int(df) == byte @pytest.mark.parametrize( ("it", "nbytes"), @@ -109,21 +94,6 @@ def test_non_greediness(self, it: list[int], nbytes: int): class TestVIB: - @pytest.mark.parametrize( - ("ints", "expectation"), - [ - ([-1], pytest.raises(MBusValidationError)), - ([256], pytest.raises(MBusValidationError)), - ([0b0111_0000], does_not_raise()), - ([0b1000_1111, 0b0111_0000], does_not_raise()), - ], - ) - def test_init_from_integers( - self, ints: list[int], expectation: AbstractContextManager - ): - with expectation: - VIB.from_integers(ints) - @pytest.mark.parametrize( ("hexstr", "expectation"), [ @@ -187,7 +157,7 @@ def test_iterability(self): vib = VIB(it) for df, byte in zip(vib, it, strict=True): - assert df.byte == byte + assert int(df) == byte @pytest.mark.parametrize( ("it", "nbytes"), diff --git a/tests/telegrams/test_fields.py b/tests/telegrams/test_fields.py index 53ff4cd..378ebb2 100644 --- a/tests/telegrams/test_fields.py +++ b/tests/telegrams/test_fields.py @@ -3,16 +3,7 @@ import pytest -from pymbus.telegrams.fields import ( - AF_BROADCAST_ALL_SLAVES_REPLY_BYTE, - AF_BROADCAST_NO_SLAVE_REPLIES_BYTE, - AF_NETWORK_LAYER_BYTE, - AF_SLAVE_MAX_RANGE_VALUE_BYTE, - AF_SLAVE_MIN_RANGE_VALUE_BYTE, - AF_UNCONFIGURED_SLAVE_BYTE, - AddressField, - ControlField, -) +from pymbus.telegrams.fields import AddressField, ControlField from pymbus.telegrams.fields import ( DataInformationField as DIF, ) @@ -29,19 +20,14 @@ class TestAddressField: def test_is_unconfigured_slave(self): - af = AddressField(AF_UNCONFIGURED_SLAVE_BYTE) + af = AddressField(0) assert af.is_unconfigured_slave() assert not af.is_configured_slave() @pytest.mark.parametrize( "byte", - [ - AF_SLAVE_MIN_RANGE_VALUE_BYTE, - AF_SLAVE_MIN_RANGE_VALUE_BYTE + 1, - AF_SLAVE_MAX_RANGE_VALUE_BYTE - 1, - AF_SLAVE_MAX_RANGE_VALUE_BYTE, - ], + [1, 2, 249, 250], ) def test_is_configured_slave(self, byte: int): af = AddressField(byte) @@ -52,11 +38,11 @@ def test_is_configured_slave(self, byte: int): @pytest.mark.parametrize( "byte", [ - AF_UNCONFIGURED_SLAVE_BYTE, - AF_SLAVE_MIN_RANGE_VALUE_BYTE, - AF_SLAVE_MIN_RANGE_VALUE_BYTE + 1, - AF_SLAVE_MAX_RANGE_VALUE_BYTE - 1, - AF_SLAVE_MAX_RANGE_VALUE_BYTE, + 0, + 1, + 2, + 249, + 250, ], ) def test_is_slave(self, byte: int): @@ -67,9 +53,9 @@ def test_is_slave(self, byte: int): @pytest.mark.parametrize( ("byte", "answer"), [ - (AF_BROADCAST_ALL_SLAVES_REPLY_BYTE, True), - (AF_BROADCAST_NO_SLAVE_REPLIES_BYTE, True), - (AF_NETWORK_LAYER_BYTE, False), + (254, True), + (255, True), + (253, False), ], ) def test_is_broadcast(self, byte: int, answer: bool): @@ -78,7 +64,7 @@ def test_is_broadcast(self, byte: int, answer: bool): assert af.is_broadcast() == answer def test_is_network_layer(self): - af = AddressField(AF_NETWORK_LAYER_BYTE) + af = AddressField(253) assert not af.is_broadcast() assert not af.is_slave() @@ -187,7 +173,7 @@ class TestDIF: ], ) def test_dif_extension_bit(self, byte: int, ext_bit: int): - dif = DIF(byte=byte) + dif = DIF(byte) assert dif.extension == ext_bit @@ -199,7 +185,7 @@ def test_dif_extension_bit(self, byte: int, ext_bit: int): ], ) def test_dif_storage_number_lsb(self, byte: int, sn_lsb: int): - dif = DIF(byte=byte) + dif = DIF(byte) assert dif.storage_number_lsb == sn_lsb @@ -213,7 +199,7 @@ def test_dif_storage_number_lsb(self, byte: int, sn_lsb: int): ], ) def test_dif_function_field(self, byte: int, function_field: int): - dif = DIF(byte=byte) + dif = DIF(byte) assert dif.function == function_field @@ -227,7 +213,7 @@ def test_dif_function_field(self, byte: int, function_field: int): ], ) def test_dif_data_field(self, byte: int, data_field: int): - dif = DIF(byte=byte) + dif = DIF(byte) assert dif.data == data_field @@ -241,7 +227,7 @@ class TestDIFE: ], ) def test_dife_extension_bit(self, byte: int, ext_bit: int): - dif = DIFE(byte=byte) + dif = DIFE(byte) assert dif.extension == ext_bit @@ -253,7 +239,7 @@ def test_dife_extension_bit(self, byte: int, ext_bit: int): ], ) def test_dife_storage_number_lsb(self, byte: int, device_unit: int): - dif = DIFE(byte=byte) + dif = DIFE(byte) assert dif.device_unit == device_unit @@ -267,7 +253,7 @@ def test_dife_storage_number_lsb(self, byte: int, device_unit: int): ], ) def test_dife_tariff(self, byte: int, tariff: int): - dif = DIFE(byte=byte) + dif = DIFE(byte) assert dif.tariff == tariff @@ -281,7 +267,7 @@ def test_dife_tariff(self, byte: int, tariff: int): ], ) def test_dife_storage_number(self, byte: int, storage_number: int): - dif = DIFE(byte=byte) + dif = DIFE(byte) assert dif.storage_number == storage_number @@ -295,7 +281,7 @@ class TestVIF: ], ) def test_vif_extension_bit(self, byte: int, ext_bit: int): - vif = VIF(byte=byte) + vif = VIF(byte) assert vif.extension == ext_bit @@ -307,7 +293,7 @@ def test_vif_extension_bit(self, byte: int, ext_bit: int): ], ) def test_vif_unit(self, byte: int, unit: int): - vif = VIF(byte=byte) + vif = VIF(byte) assert vif.unit == unit @@ -321,7 +307,7 @@ class TestVIFE: ], ) def test_vife_extension_bit(self, byte: int, ext_bit: int): - vife = VIFE(byte=byte) + vife = VIFE(byte) assert vife.extension == ext_bit @@ -333,6 +319,6 @@ def test_vife_extension_bit(self, byte: int, ext_bit: int): ], ) def test_vife_unit(self, byte: int, unit: int): - vife = VIFE(byte=byte) + vife = VIFE(byte) assert vife.unit == unit diff --git a/tests/telegrams/test_frames.py b/tests/telegrams/test_frames.py index 4bb6827..6a84638 100644 --- a/tests/telegrams/test_frames.py +++ b/tests/telegrams/test_frames.py @@ -25,14 +25,11 @@ class TestSingleFrame: [ ([ACK_BYTE], does_not_raise()), ([ACK_BYTE - 1], pytest.raises(MBusValidationError)), - ([], pytest.raises(MBusLengthError)), - ([ACK_BYTE, ACK_BYTE], pytest.raises(MBusLengthError)), + ([], does_not_raise()), + ([ACK_BYTE, ACK_BYTE], does_not_raise()), ], ) def test_init(self, it: list[int], expectation: AbstractContextManager): - with expectation: - SingleFrame.from_integers(it) - with expectation: SingleFrame(it) @@ -69,9 +66,6 @@ class TestShortFrame: ], ) def test_init(self, it: list[int], expectation: AbstractContextManager): - with expectation: - ShortFrame.from_integers(it) - with expectation: ShortFrame(it) @@ -130,9 +124,6 @@ class TestControlFrame: ], ) def test_init(self, it: list[int], expectation: AbstractContextManager): - with expectation: - ControlFrame.from_integers(it) - with expectation: ControlFrame(it) @@ -219,9 +210,6 @@ class TestLongFrame: ], ) def test_init(self, it: list[int], expectation: AbstractContextManager): - with expectation: - LongFrame.from_integers(it) - with expectation: LongFrame(it) diff --git a/tests/telegrams/test_records.py b/tests/telegrams/test_records.py index 69ef525..f30f121 100644 --- a/tests/telegrams/test_records.py +++ b/tests/telegrams/test_records.py @@ -58,7 +58,7 @@ def test_non_greediness(self, data: bytes, answer: list): fields = [field for block in blocks for field in block] nfields = len(fields) - assert dr.as_bytes() == data[:nfields] + assert bytes(map(int, dr)) == data[:nfields] assert blocks == answer assert list(gen) == list(map(int, data[nfields:])) @@ -88,4 +88,4 @@ def test_init( dr = DR(it) assert dr.drh == drh - assert dr.data.as_ints() == data + assert list(map(int, dr.data)) == data