Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions src/vehicle.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def __init__(
# treat high voltage battery as active, if we don't have any other information
self.__hv_battery_active = True
self.__hv_battery_active_from_car = True
self.is_charging = False
self.__is_charging = False
self.__had_significant_charging_power = False
self.refresh_period_active = -1
self.refresh_period_inactive = -1
self.refresh_period_after_shutdown = -1
Expand Down Expand Up @@ -285,6 +286,24 @@ def is_complete(self) -> bool:
and self.refresh_mode is not None
)

@property
def is_charging(self) -> bool:
return self.__is_charging

@is_charging.setter
def is_charging(self, new_state: bool) -> None:
old_state = self.__is_charging
if old_state and not new_state and self.__had_significant_charging_power:
self.last_car_shutdown = datetime.datetime.now(tz=datetime.UTC)
LOG.info(
"Charging stopped for vehicle %s, resetting last_car_shutdown to %s",
self.vin,
self.last_car_shutdown,
)
if not new_state:
self.__had_significant_charging_power = False
self.__is_charging = new_state

def set_is_charging(self, is_charging: bool) -> None:
self.is_charging = is_charging
self.hv_battery_active = self.is_charging
Expand Down Expand Up @@ -578,6 +597,8 @@ def handle_charge_status(
self.is_charging = result.is_charging or False

if self.is_charging and result.power is not None and result.power < -1:
# Mark that we've seen significant charging power (> 1kW)
self.__had_significant_charging_power = True
# Only compute a dynamic refresh period if we have detected at least 1kW of power during charging
time_for_1pct = (
36.0 * result.real_total_battery_capacity / abs(result.power)
Expand All @@ -599,11 +620,11 @@ def handle_charge_status(
elif not self.is_charging:
# Reset the charging refresh period if we detected we are no longer charging
self.set_refresh_period_charging(0)
else:
# Otherwise let's keep the last computed refresh period
# This avoids falling back to the active refresh period which, being too often, results in a ban from
# the SAIC API
pass
elif self.refresh_period_charging < self.refresh_period_after_shutdown:
# Charging with insignificant power (< 1kW, e.g. OBC trickle/maintenance).
# Use at least the after-shutdown period to avoid polling at the active rate (30s)
# indefinitely, which wastes API quota and can drain the 12V battery.
self.set_refresh_period_charging(self.refresh_period_after_shutdown)

return result

Expand Down
152 changes: 151 additions & 1 deletion tests/test_vehicle_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from configuration import Configuration
from exceptions import VehicleStatusDriftException
import mqtt_topics
from vehicle import RefreshMode, VehicleState
from vehicle import PollingPhase, RefreshMode, VehicleState
from vehicle_info import VehicleInfo

from .common_mocks import (
Expand Down Expand Up @@ -336,6 +336,156 @@ def test_mileage_of_day_published_when_no_vehicle_status_yet(self) -> None:
DRIVETRAIN_MILEAGE_OF_DAY,
)

def test_is_charging_true_to_false_resets_last_car_shutdown(self) -> None:
"""When is_charging transitions True -> False after significant power, last_car_shutdown is updated."""
old_shutdown = self.vehicle_state.last_car_shutdown
self.vehicle_state.is_charging = True
# Simulate significant charging power detected via handle_charge_status
self.vehicle_state._VehicleState__had_significant_charging_power = True
self.vehicle_state.is_charging = False
assert self.vehicle_state.last_car_shutdown > old_shutdown

def test_is_charging_true_to_false_without_significant_power_does_not_reset(
self,
) -> None:
"""When is_charging transitions True -> False without significant power, no reset."""
old_shutdown = self.vehicle_state.last_car_shutdown
self.vehicle_state.is_charging = True
# No significant power flag set (e.g. OBC trickle)
self.vehicle_state.is_charging = False
assert self.vehicle_state.last_car_shutdown == old_shutdown

def test_is_charging_false_to_false_does_not_reset_last_car_shutdown(self) -> None:
"""When is_charging stays False, last_car_shutdown is not updated."""
old_shutdown = self.vehicle_state.last_car_shutdown
self.vehicle_state.is_charging = False
assert self.vehicle_state.last_car_shutdown == old_shutdown

def test_is_charging_true_to_true_does_not_reset_last_car_shutdown(self) -> None:
"""When is_charging stays True, last_car_shutdown is not updated."""
self.vehicle_state.is_charging = True
old_shutdown = self.vehicle_state.last_car_shutdown
self.vehicle_state.is_charging = True
assert self.vehicle_state.last_car_shutdown == old_shutdown

def test_should_refresh_off_publishes_off_phase(self) -> None:
self.vehicle_state.configure_missing()
self.vehicle_state.set_refresh_mode(RefreshMode.OFF, "test")
self.publisher.map.clear()
result = self.vehicle_state.should_refresh()
assert result is False
self.assert_mqtt_topic(
self.get_topic(mqtt_topics.REFRESH_POLLING_PHASE),
PollingPhase.OFF.value,
)

def test_should_refresh_force_publishes_force_phase(self) -> None:
self.vehicle_state.configure_missing()
self.vehicle_state.set_refresh_mode(RefreshMode.FORCE, "test")
self.publisher.map.clear()
result = self.vehicle_state.should_refresh()
assert result is True
self.assert_mqtt_topic(
self.get_topic(mqtt_topics.REFRESH_POLLING_PHASE),
PollingPhase.FORCE.value,
)

def test_should_refresh_charging_detection_resets_shutdown(self) -> None:
self.vehicle_state.configure_missing()
old_shutdown = self.vehicle_state.last_car_shutdown
self.vehicle_state.set_refresh_mode(RefreshMode.CHARGING_DETECTION, "test")
self.publisher.map.clear()
result = self.vehicle_state.should_refresh()
assert result is True
assert self.vehicle_state.last_car_shutdown >= old_shutdown
self.assert_mqtt_topic(
self.get_topic(mqtt_topics.REFRESH_POLLING_PHASE),
PollingPhase.CHARGING_DETECTION.value,
)

def test_should_refresh_charging_detection_reverts_to_previous(self) -> None:
self.vehicle_state.configure_missing()
# Previous mode is PERIODIC (set by configure_missing)
self.vehicle_state.set_refresh_mode(RefreshMode.CHARGING_DETECTION, "test")
self.vehicle_state.should_refresh()
assert self.vehicle_state.refresh_mode == RefreshMode.PERIODIC

def test_should_refresh_charging_detection_from_off_reverts_to_off(self) -> None:
self.vehicle_state.configure_missing()
self.vehicle_state.set_refresh_mode(RefreshMode.OFF, "test")
self.vehicle_state.set_refresh_mode(RefreshMode.CHARGING_DETECTION, "test")
self.vehicle_state.should_refresh()
assert self.vehicle_state.refresh_mode == RefreshMode.OFF

def test_periodic_inactive_publishes_inactive_phase(self) -> None:
self.vehicle_state.configure_missing()
# Ensure the car is inactive and grace period has passed
self.vehicle_state.hv_battery_active = False
self.vehicle_state.last_car_shutdown = datetime.datetime.min.replace(
tzinfo=datetime.UTC
)
self.vehicle_state.last_car_activity = datetime.datetime.min.replace(
tzinfo=datetime.UTC
)
self.vehicle_state.last_successful_refresh = datetime.datetime.now(
tz=datetime.UTC
)
self.publisher.map.clear()
result = self.vehicle_state.should_refresh()
assert result is False
self.assert_mqtt_topic(
self.get_topic(mqtt_topics.REFRESH_POLLING_PHASE),
PollingPhase.INACTIVE.value,
)

def test_periodic_after_shutdown_publishes_after_shutdown_phase(self) -> None:
self.vehicle_state.configure_missing()
# Car just shut down, grace period active
self.vehicle_state.hv_battery_active = False
self.vehicle_state.last_car_shutdown = datetime.datetime.now(
tz=datetime.UTC
)
self.vehicle_state.last_car_activity = datetime.datetime.min.replace(
tzinfo=datetime.UTC
)
self.vehicle_state.last_successful_refresh = datetime.datetime.now(
tz=datetime.UTC
)
self.publisher.map.clear()
result = self.vehicle_state.should_refresh()
# Should not refresh yet (just refreshed) but phase should be after_shutdown
assert result is False
self.assert_mqtt_topic(
self.get_topic(mqtt_topics.REFRESH_POLLING_PHASE),
PollingPhase.AFTER_SHUTDOWN.value,
)

def test_charging_stop_triggers_after_shutdown_grace(self) -> None:
"""End-to-end: charging stops -> last_car_shutdown resets -> after_shutdown phase."""
self.vehicle_state.configure_missing()
# Car is parked (off), only charging keeps it "active"
self.vehicle_state.hv_battery_active = False
self.vehicle_state.is_charging = True
self.vehicle_state._VehicleState__had_significant_charging_power = True
self.vehicle_state.hv_battery_active = True
# Simulate a recent successful refresh
self.vehicle_state.last_successful_refresh = datetime.datetime.now(
tz=datetime.UTC
)
self.vehicle_state.last_car_activity = datetime.datetime.min.replace(
tzinfo=datetime.UTC
)
# Charging stops (e.g. phase switch) — car itself is off
self.vehicle_state.is_charging = False
self.vehicle_state.hv_battery_active = False
self.publisher.map.clear()
self.vehicle_state.should_refresh()
# Grace period is now active, phase should be after_shutdown
self.assert_mqtt_topic(
self.get_topic(mqtt_topics.REFRESH_POLLING_PHASE),
PollingPhase.AFTER_SHUTDOWN.value,
)

@staticmethod
def get_topic(sub_topic: str) -> str:
return f"/vehicles/{VIN}/{sub_topic}"
Loading