Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .github/workflows/mapache-go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: mapache-go

on:
push:
branches: [main]
paths:
- "mapache-go/**"
pull_request:
branches: [main]
paths:
- "mapache-go/**"

defaults:
run:
working-directory: mapache-go

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
with:
go-version: "1.26"

- name: Vet
run: go vet ./...

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
with:
go-version: "1.26"

- name: Test
run: go test ./... -v
58 changes: 58 additions & 0 deletions .github/workflows/mapache-py.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
name: mapache-py

on:
push:
branches: [main]
paths:
- "mapache-py/**"
pull_request:
branches: [main]
paths:
- "mapache-py/**"

defaults:
run:
working-directory: mapache-py

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: "3.13"

- name: Install dependencies
run: pip install -e ".[dev]"

- name: Lint
run: |
ruff format --check src/ tests/
ruff check src/ tests/

- name: Type check
run: mypy src/

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: |
3.10
3.11
3.12
3.13

- name: Test
run: |
for ver in 3.10 3.11 3.12 3.13; do
echo "::group::Python $ver"
python$ver -m pip install -e ".[dev]"
python$ver -m pytest tests/ -v
echo "::endgroup::"
done
40 changes: 40 additions & 0 deletions mapache-py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "gr-mapache"
version = "0.1.0"
description = "Mapache telemetry library for Gaucho Racing"
license = "MIT"
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]

[project.optional-dependencies]
dev = [
"pytest",
"ruff",
"mypy",
]

[tool.hatch.build.targets.wheel]
packages = ["src/mapache"]

[tool.ruff]
target-version = "py310"
line-length = 120

[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"]

[tool.mypy]
strict = true
46 changes: 46 additions & 0 deletions mapache-py/src/mapache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from mapache.binary import (
big_endian_bytes_to_signed_int,
big_endian_bytes_to_unsigned_int,
big_endian_signed_int_to_binary,
big_endian_signed_int_to_binary_string,
big_endian_unsigned_int_to_binary,
big_endian_unsigned_int_to_binary_string,
little_endian_bytes_to_signed_int,
little_endian_bytes_to_unsigned_int,
little_endian_signed_int_to_binary,
little_endian_signed_int_to_binary_string,
little_endian_unsigned_int_to_binary,
little_endian_unsigned_int_to_binary_string,
)
from mapache.message import Endian, ExportSignalFunc, Field, Message, SignMode, new_field
from mapache.ping import Ping
from mapache.signal import Signal
from mapache.vehicle import Marker, Segment, Session, Vehicle, derive_segments

__all__ = [
"big_endian_bytes_to_signed_int",
"big_endian_bytes_to_unsigned_int",
"big_endian_signed_int_to_binary",
"big_endian_signed_int_to_binary_string",
"big_endian_unsigned_int_to_binary",
"big_endian_unsigned_int_to_binary_string",
"derive_segments",
"little_endian_bytes_to_signed_int",
"little_endian_bytes_to_unsigned_int",
"little_endian_signed_int_to_binary",
"little_endian_signed_int_to_binary_string",
"little_endian_unsigned_int_to_binary",
"little_endian_unsigned_int_to_binary_string",
"new_field",
"Endian",
"ExportSignalFunc",
"Field",
"Marker",
"Message",
"Ping",
"Segment",
"Session",
"Signal",
"SignMode",
"Vehicle",
]
173 changes: 173 additions & 0 deletions mapache-py/src/mapache/binary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from __future__ import annotations

import struct


def big_endian_unsigned_int_to_binary_string(num: int, num_bytes: int) -> str:
b = big_endian_unsigned_int_to_binary(num, num_bytes)
return "".join(f"{byte:08b}" for byte in b)


def big_endian_unsigned_int_to_binary(num: int, num_bytes: int) -> bytes:
if num < 0:
raise ValueError("cannot convert negative number to binary")
if num_bytes < 1:
raise ValueError("cannot convert to binary with less than 1 byte")
if num_bytes < 8 and num >= (1 << (num_bytes * 8)):
raise ValueError(f"number is too large to fit in {num_bytes} bytes")

if num_bytes == 1:
return struct.pack(">B", num)
elif num_bytes == 2:
return struct.pack(">H", num)
elif num_bytes == 4:
return struct.pack(">I", num)
elif num_bytes == 8:
return struct.pack(">Q", num)

result = bytearray(num_bytes)
for i in range(num_bytes):
result[i] = (num >> ((num_bytes - i - 1) * 8)) & 0xFF
return bytes(result)


def big_endian_signed_int_to_binary_string(num: int, num_bytes: int) -> str:
b = big_endian_signed_int_to_binary(num, num_bytes)
return "".join(f"{byte:08b}" for byte in b)


def big_endian_signed_int_to_binary(num: int, num_bytes: int) -> bytes:
if num_bytes < 1:
raise ValueError("cannot convert to binary with less than 1 byte")
min_value = -(1 << ((num_bytes * 8) - 1))
max_value = (1 << ((num_bytes * 8) - 1)) - 1
if num < min_value or num > max_value:
raise ValueError(f"number is too large to fit in {num_bytes} bytes")

if num_bytes == 1:
return struct.pack(">b", num)
elif num_bytes == 2:
return struct.pack(">h", num)
elif num_bytes == 4:
return struct.pack(">i", num)
elif num_bytes == 8:
return struct.pack(">q", num)

if num < 0:
num = (1 << (num_bytes * 8)) + num
result = bytearray(num_bytes)
for i in range(num_bytes):
result[i] = (num >> ((num_bytes - i - 1) * 8)) & 0xFF
return bytes(result)


def big_endian_bytes_to_unsigned_int(data: bytes | bytearray) -> int:
result = 0
for i, b in enumerate(data):
result += b << ((len(data) - i - 1) * 8)
return result


def big_endian_bytes_to_signed_int(data: bytes | bytearray) -> int:
n = len(data)
if n == 1:
return int(struct.unpack(">b", data)[0])
elif n == 2:
return int(struct.unpack(">h", data)[0])
elif n == 4:
return int(struct.unpack(">i", data)[0])
elif n == 8:
return int(struct.unpack(">q", data)[0])

# fallback for arbitrary byte lengths
result = 0
if data[0] >= 128:
result = -1 << ((n - 1) * 8)
for i, b in enumerate(data):
result += b << ((n - i - 1) * 8)
return result


def little_endian_unsigned_int_to_binary_string(num: int, num_bytes: int) -> str:
b = little_endian_unsigned_int_to_binary(num, num_bytes)
return "".join(f"{byte:08b}" for byte in b)


def little_endian_unsigned_int_to_binary(num: int, num_bytes: int) -> bytes:
if num < 0:
raise ValueError("cannot convert negative number to binary")
if num_bytes < 1:
raise ValueError("cannot convert to binary with less than 1 byte")
if num_bytes < 8 and num >= (1 << (num_bytes * 8)):
raise ValueError(f"number is too large to fit in {num_bytes} bytes")

if num_bytes == 1:
return struct.pack("<B", num)
elif num_bytes == 2:
return struct.pack("<H", num)
elif num_bytes == 4:
return struct.pack("<I", num)
elif num_bytes == 8:
return struct.pack("<Q", num)

result = bytearray(num_bytes)
for i in range(num_bytes):
result[i] = (num >> (i * 8)) & 0xFF
return bytes(result)


def little_endian_signed_int_to_binary_string(num: int, num_bytes: int) -> str:
b = little_endian_signed_int_to_binary(num, num_bytes)
return "".join(f"{byte:08b}" for byte in b)


def little_endian_signed_int_to_binary(num: int, num_bytes: int) -> bytes:
if num_bytes < 1:
raise ValueError("cannot convert to binary with less than 1 byte")
min_value = -(1 << ((num_bytes * 8) - 1))
max_value = (1 << ((num_bytes * 8) - 1)) - 1
if num < min_value or num > max_value:
raise ValueError(f"number is too large to fit in {num_bytes} bytes")

if num_bytes == 1:
return struct.pack("<b", num)
elif num_bytes == 2:
return struct.pack("<h", num)
elif num_bytes == 4:
return struct.pack("<i", num)
elif num_bytes == 8:
return struct.pack("<q", num)

if num < 0:
num = (1 << (num_bytes * 8)) + num
result = bytearray(num_bytes)
for i in range(num_bytes):
result[i] = (num >> (i * 8)) & 0xFF
return bytes(result)


def little_endian_bytes_to_unsigned_int(data: bytes | bytearray) -> int:
result = 0
for i, b in enumerate(data):
result += b << (i * 8)
return result


def little_endian_bytes_to_signed_int(data: bytes | bytearray) -> int:
n = len(data)
if n == 1:
return int(struct.unpack("<b", data)[0])
elif n == 2:
return int(struct.unpack("<h", data)[0])
elif n == 4:
return int(struct.unpack("<i", data)[0])
elif n == 8:
return int(struct.unpack("<q", data)[0])

# fallback for arbitrary byte lengths
result = 0
if data[-1] >= 128:
result = -1 << ((n - 1) * 8)
for i, b in enumerate(data):
result += b << (i * 8)
return result
Loading
Loading