Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simvue/api/objects/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get(
_class_instance = cls(_read_only=True, _local=True)
_count: int = 0

for response in cls._get_all_objects(offset, run=run_id, **kwargs):
for response in cls._get_all_objects(offset, count=count, run=run_id, **kwargs):
if (_data := response.get("data")) is None:
raise RuntimeError(
f"Expected key 'data' for retrieval of {_class_instance.__class__.__name__.lower()}s"
Expand Down
23 changes: 9 additions & 14 deletions simvue/api/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,23 +303,18 @@ def get_paginated(
server response
"""
_offset: int = offset or 0

while (
(
_response := get(
url=url,
headers=headers,
params=(params or {})
| {"count": count or MAX_ENTRIES_PER_PAGE, "start": _offset},
timeout=timeout,
json=json,
)
_response := get(
url=url,
headers=headers,
params=(params or {})
| {"count": count or MAX_ENTRIES_PER_PAGE, "start": _offset},
timeout=timeout,
json=json,
)
.json()
.get("data")
):
).json():
yield _response
_offset += MAX_ENTRIES_PER_PAGE

if count and _offset > count:
if (count and _offset > count) or (_response.json().get("count", 0) < _offset):
break
6 changes: 4 additions & 2 deletions simvue/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,8 @@ def get_metric_values(

_args = {"filters": json.dumps(run_filters)} if run_filters else {}

_run_data = dict(Run.get(**_args))
if not run_ids:
_run_data = dict(Run.get(**_args))

if not (
_run_metrics := self._get_run_metrics_from_server(
Expand All @@ -853,7 +854,8 @@ def get_metric_values(
)
if use_run_names:
_run_metrics = {
_run_data[key].name: _run_metrics[key] for key in _run_metrics.keys()
Run(identifier=key).name: _run_metrics[key]
for key in _run_metrics.keys()
}
return parse_run_set_metrics(
_run_metrics,
Expand Down
1 change: 1 addition & 0 deletions simvue/config/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def fetch(
_default_dir = _config_dict["offline"].get(
"cache", DEFAULT_OFFLINE_DIRECTORY
)
pathlib.Path(_default_dir).mkdir(parents=True, exist_ok=True)

_config_dict["offline"]["cache"] = _default_dir

Expand Down
7 changes: 2 additions & 5 deletions simvue/eco/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,15 @@ def __init__(self, *args, **kwargs) -> None:
co2_api_endpoint : str
endpoint for CO2 signal API
co2_api_token: str
RECOMMENDED. The API token for the CO2 Signal API, default is None.
The API token for the ElectricityMaps API, default is None.
timeout : int
timeout for API
"""
super().__init__(*args, **kwargs)
self._logger = logging.getLogger(self.__class__.__name__)

if not self.co2_api_token:
self._logger.warning(
"⚠️ No API token provided for CO2 Signal, "
"use of a token is strongly recommended."
)
raise ValueError("API token is required for ElectricityMaps API.")

self._get_user_location_info()

Expand Down
23 changes: 1 addition & 22 deletions simvue/eco/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
__date__ = "2025-03-06"

import pydantic
import pathlib
import os

from simvue.config.files import DEFAULT_OFFLINE_DIRECTORY


class EcoConfig(pydantic.BaseModel):
Expand All @@ -25,30 +21,13 @@ class EcoConfig(pydantic.BaseModel):
the TDP for the CPU
gpu_thermal_design_power: int | None, optional
the TDP for each GPU
local_data_directory: str, optional
the directory to store local data, default is Simvue offline directory
"""

co2_signal_api_token: pydantic.SecretStr | None = None
cpu_thermal_design_power: pydantic.PositiveInt | None = None
cpu_n_cores: pydantic.PositiveInt | None = None
gpu_thermal_design_power: pydantic.PositiveInt | None = None
local_data_directory: pydantic.DirectoryPath | None = pydantic.Field(
None, validate_default=True
)
intensity_refresh_interval: pydantic.PositiveInt | str | None = pydantic.Field(
default="1 day", gt=2 * 60
default="1 hour", gt=2 * 60
)
co2_intensity: float | None = None

@pydantic.field_validator("local_data_directory", mode="before", check_fields=True)
@classmethod
def check_local_data_env(
cls, local_data_directory: pathlib.Path | None
) -> pathlib.Path:
if _data_directory := os.environ.get("SIMVUE_ECO_DATA_DIRECTORY"):
return pathlib.Path(_data_directory)
if not local_data_directory:
local_data_directory = pathlib.Path(DEFAULT_OFFLINE_DIRECTORY)
local_data_directory.mkdir(exist_ok=True, parents=True)
return local_data_directory
36 changes: 21 additions & 15 deletions simvue/eco/emissions_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ def __init__(self, *args, **kwargs) -> None:
"""
_logger = logging.getLogger(self.__class__.__name__)

if not (
kwargs.get("co2_intensity")
or kwargs.get("co2_signal_api_token")
or kwargs.get("offline")
):
raise ValueError(
"ElectricityMaps API token or hardcoeded CO2 intensity value is required for emissions tracking."
)

if not isinstance(kwargs.get("thermal_design_power_per_cpu"), float):
kwargs["thermal_design_power_per_cpu"] = 80.0
_logger.warning(
Expand Down Expand Up @@ -229,7 +238,6 @@ def estimate_co2_emissions(

if self.co2_intensity:
_current_co2_intensity = self.co2_intensity
_co2_units = "kgCO2/kWh"
else:
self.check_refresh()
# If no local data yet then return
Expand All @@ -238,7 +246,7 @@ def estimate_co2_emissions(
"No CO2 emission data recorded as no CO2 intensity value "
"has been provided and there is no local intensity data available."
)
return
return False

if self._client:
_country_code = self._client.country_code
Expand All @@ -251,10 +259,8 @@ def estimate_co2_emissions(
**self._local_data[_country_code]
)
_current_co2_intensity = self._current_co2_data.data.carbon_intensity
_co2_units = self._current_co2_data.carbon_intensity_units
_process.gpu_percentage = gpu_percent
_process.cpu_percentage = cpu_percent
_previous_energy: float = _process.total_energy
_process.power_usage = (_process.cpu_percentage / 100.0) * (
self.thermal_design_power_per_cpu / self.n_cores_per_cpu
)
Expand All @@ -263,23 +269,23 @@ def estimate_co2_emissions(
_process.power_usage += (
_process.gpu_percentage / 100.0
) * self.thermal_design_power_per_gpu
# Convert W to kW
_process.power_usage /= 1000
# Measure energy in kWh
_process.energy_delta = _process.power_usage * measure_interval / 3600
_process.total_energy += _process.energy_delta

_process.total_energy += _process.power_usage * measure_interval
_process.energy_delta = _process.total_energy - _previous_energy

# Measured value is in g/kWh, convert to kg/kWs
_carbon_intensity_kgpws: float = _current_co2_intensity / (60 * 60 * 1e3)

_process.co2_delta = (
_process.power_usage * _carbon_intensity_kgpws * measure_interval
)
# Measured value is in g/kWh, convert to kg/kWh
_carbon_intensity: float = _current_co2_intensity / 1000

_process.co2_delta = _process.energy_delta * _carbon_intensity
_process.co2_emission += _process.co2_delta

self._logger.debug(
f"📝 For process '{process_id}', recorded: CPU={_process.cpu_percentage:.2f}%, "
f"Power={_process.power_usage:.2f}W, CO2={_process.co2_emission:.2e}{_co2_units}"
f"📝 For process '{process_id}', in interval {measure_interval}, recorded: CPU={_process.cpu_percentage:.2f}%, "
f"Power={_process.power_usage:.2f}kW, Energy = {_process.energy_delta}kWh, CO2={_process.co2_delta:.2e}kg"
)
return True

def simvue_metrics(self) -> dict[str, float]:
"""Retrieve metrics to send to Simvue server."""
Expand Down
17 changes: 9 additions & 8 deletions simvue/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,19 +374,20 @@ def _get_internal_metrics(
# For the first emissions metrics reading, the time interval to use
# Is the time since the run started, otherwise just use the time between readings
if self._emissions_monitor:
self._emissions_monitor.estimate_co2_emissions(
_estimated = self._emissions_monitor.estimate_co2_emissions(
process_id=f"{self._name}",
cpu_percent=_current_system_measure.cpu_percent,
measure_interval=(time.time() - self._start_time)
if system_metrics_step == 0
else self._system_metrics_interval,
gpu_percent=_current_system_measure.gpu_percent,
)
self._add_metrics_to_dispatch(
self._emissions_monitor.simvue_metrics(),
join_on_fail=False,
step=system_metrics_step,
)
if _estimated:
self._add_metrics_to_dispatch(
self._emissions_monitor.simvue_metrics(),
join_on_fail=False,
step=system_metrics_step,
)

def _create_heartbeat_callback(
self,
Expand Down Expand Up @@ -1083,7 +1084,7 @@ def config(
self._emissions_monitor = CO2Monitor(
intensity_refresh_interval=None,
co2_intensity=self._user_config.eco.co2_intensity,
local_data_directory=self._user_config.eco.local_data_directory,
local_data_directory=self._user_config.offline.cache,
co2_signal_api_token=None,
thermal_design_power_per_cpu=self._user_config.eco.cpu_thermal_design_power,
thermal_design_power_per_gpu=self._user_config.eco.gpu_thermal_design_power,
Expand All @@ -1092,7 +1093,7 @@ def config(
else:
self._emissions_monitor = CO2Monitor(
intensity_refresh_interval=self._user_config.eco.intensity_refresh_interval,
local_data_directory=self._user_config.eco.local_data_directory,
local_data_directory=self._user_config.offline.cache,
co2_signal_api_token=self._user_config.eco.co2_signal_api_token,
co2_intensity=self._user_config.eco.co2_intensity,
thermal_design_power_per_cpu=self._user_config.eco.cpu_thermal_design_power,
Expand Down
14 changes: 3 additions & 11 deletions simvue/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ def sender(
max_workers: int = 5,
threading_threshold: int = 10,
objects_to_upload: list[str] = UPLOAD_ORDER,
co2_intensity_refresh: int | None | str = None,
) -> dict[str, str]:
"""Send data from a local cache directory to the Simvue server.

Expand All @@ -165,9 +164,6 @@ def sender(
The number of cached files above which threading will be used
objects_to_upload : list[str]
Types of objects to upload, by default uploads all types of objects present in cache
co2_intensity_refresh: int | None | str
the refresh interval for the CO2 intensity value, if None use config value if available,
else do not refresh.

Returns
-------
Expand Down Expand Up @@ -249,17 +245,13 @@ def sender(
# refreshes the CO2 intensity value if required. No emission metrics
# will be taken by the sender itself, values are assumed to be recorded
# by any offline runs being sent.

if (
_refresh_interval := co2_intensity_refresh
or _user_config.eco.intensity_refresh_interval
):
if _user_config.metrics.enable_emission_metrics:
CO2Monitor(
thermal_design_power_per_gpu=None,
thermal_design_power_per_cpu=None,
local_data_directory=cache_dir,
intensity_refresh_interval=_refresh_interval,
co2_intensity=co2_intensity_refresh or _user_config.eco.co2_intensity,
intensity_refresh_interval=_user_config.eco.intensity_refresh_interval,
co2_intensity=_user_config.eco.co2_intensity,
co2_signal_api_token=_user_config.eco.co2_signal_api_token,
).check_refresh()

Expand Down
37 changes: 21 additions & 16 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,17 @@ def clear_out_files() -> None:

for file_obj in out_files:
file_obj.unlink()



@pytest.fixture()
def offline_cache_setup(monkeypatch: monkeypatch.MonkeyPatch):
# Will be executed before test
cache_dir = tempfile.TemporaryDirectory()
monkeypatch.setenv("SIMVUE_OFFLINE_DIRECTORY", cache_dir.name)
yield cache_dir
# Will be executed after test
cache_dir.cleanup()
monkeypatch.setenv("SIMVUE_OFFLINE_DIRECTORY", None)

@pytest.fixture
def mock_co2_signal(monkeypatch: monkeypatch.MonkeyPatch) -> dict[str, dict | str]:
_mock_data = {
Expand Down Expand Up @@ -86,13 +95,21 @@ def _mock_location_info(self) -> None:

monkeypatch.setattr(requests, "get", _mock_get)
monkeypatch.setattr(sv_eco.APIClient, "_get_user_location_info", _mock_location_info)


_fetch = sv_cfg.SimvueConfiguration.fetch
@classmethod
def _mock_fetch(cls, *args, **kwargs) -> sv_cfg.SimvueConfiguration:
_conf = _fetch(*args, **kwargs)
_conf.eco.co2_signal_api_token = "test_token"
_conf.metrics.enable_emission_metrics = True
return _conf
monkeypatch.setattr(sv_cfg.SimvueConfiguration, "fetch", _mock_fetch)
return _mock_data


@pytest.fixture
def speedy_heartbeat(monkeypatch: monkeypatch.MonkeyPatch) -> None:
monkeypatch.setattr(sv_run, "HEARTBEAT_INTERVAL", 0.1)
monkeypatch.setattr(sv_run, "HEARTBEAT_INTERVAL", 1)


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -286,15 +303,3 @@ def setup_test_run(run: sv_run.Run, create_objects: bool, request: pytest.Fixtur
TEST_DATA["alert_ids"] = _alert_ids

return TEST_DATA


@pytest.fixture
def offline_test() -> pathlib.Path:
with tempfile.TemporaryDirectory() as tempd:
_tempdir = pathlib.Path(tempd)
_cache_dir = _tempdir.joinpath(".simvue")
_cache_dir.mkdir(exist_ok=True)
os.environ["SIMVUE_OFFLINE_DIRECTORY"] = f"{_cache_dir}"
assert sv_cfg.SimvueConfiguration.fetch().offline.cache == _cache_dir
yield _tempdir

Loading