From 19d4e16a88b8511757a78b73658ac57012af0922 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sylvain=20Monn=C3=A9?= Date: Sun, 11 Jan 2026 19:40:51 +0100 Subject: [PATCH 1/3] Migrate Notify.js CDN URL from RawGit to jsDelivr --- src/templates/html/head.html | 2 +- src/templates/index.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/templates/html/head.html b/src/templates/html/head.html index 44224f3b..a6781bfe 100644 --- a/src/templates/html/head.html +++ b/src/templates/html/head.html @@ -19,7 +19,7 @@ - + diff --git a/src/templates/index.html b/src/templates/index.html index a3b769f1..072b69e9 100644 --- a/src/templates/index.html +++ b/src/templates/index.html @@ -19,7 +19,7 @@ src="https://cdn.datatables.net/plug-ins/1.12.1/sorting/custom-data-source/dom-text.js"> - + From 0a3f97c1431cd50aae977965862d77ef790c42af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20VALENTIN?= Date: Mon, 12 Jan 2026 20:38:22 +0100 Subject: [PATCH 2/3] chore: bump version to 0.13.3 Co-Authored-By: Claude Opus 4.5 --- src/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/__version__.py b/src/__version__.py index 8b0390b3..f404a742 100644 --- a/src/__version__.py +++ b/src/__version__.py @@ -1,3 +1,3 @@ """Application version update by Semantic Release.""" -VERSION = "0.13.2" +VERSION = "0.13.3" From 046a5371f11d02da5bb028bd62e6282fc3fa0475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20VALENTIN?= Date: Wed, 14 Jan 2026 11:02:54 +0100 Subject: [PATCH 3/3] feat: add VictoriaMetrics support - Add vm_mode configuration option for VictoriaMetrics compatibility - Skip health check when using VictoriaMetrics (not supported) - Skip Flux queries in vm_mode (not supported by VictoriaMetrics) - Simplify write_options initialization with dict.get() - Bump version to 0.13.4 Co-Authored-By: Claude Opus 4.5 --- README.md | 4 +- config.exemple.yaml | 3 +- pyproject.toml | 2 +- src/__version__.py | 2 +- src/init.py | 17 ++--- src/models/influxdb.py | 146 +++++++++++++++-------------------------- 6 files changed, 66 insertions(+), 108 deletions(-) diff --git a/README.md b/README.md index f4083ada..db691c38 100755 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Toutes les données seront importées dans un cache local (SQLite) ou externe (P - MQTT - Home Assistant (Via l'auto-discovery MQTT) -- InfluxDB +- InfluxDB / VictoriaMetrics - PostgresSQL L'outil possède également des APIs. @@ -58,7 +58,7 @@ All data are import in local cache (SQLite) or external backend (PostgreSQL) whi - MQTT - Home Assistant (auto-discovery MQTT) -- InfluxDB +- InfluxDB / VictoriaMetrics - PostgresSQL The tool also has APIs. diff --git a/config.exemple.yaml b/config.exemple.yaml index 7d704030..65f976d7 100755 --- a/config.exemple.yaml +++ b/config.exemple.yaml @@ -25,13 +25,14 @@ influxdb: enable: false scheme: http hostname: influxdb - port: 8086 + port: 8086 # 8086 pour influxdb, 8428 pour victoriametrics token: myelectricaldata org: myelectricaldata bucket: myelectricaldata # ATTENTION, L'activation de l'importation asynchrone va réduire fortement le temps d'importation dans InfluxDB # mais va augmenter la consommation mémoire & CPU et donc à activer uniquement sur un hardware robuste. method: synchronous # Mode disponible : synchronous / asynchronous / batching + vm_mode: true # Active le mode VictoriaMetrics true sinon false pour influxdb # batching_options permet uniquement de configurer la methode `batching`. # Pour plus d'information : https://github.com/influxdata/influxdb-client-python#batching batching_options: diff --git a/pyproject.toml b/pyproject.toml index dc96da14..49b957cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api" # POETRY CONFIGURATION [tool.poetry] name = "myelectricaldata-import" -version = "0.13.2" +version = "0.13.4" description = "MyElectricalData official client" authors = ["Clément VALENTIN "] readme = "README.md" diff --git a/src/__version__.py b/src/__version__.py index f404a742..258f1ca4 100644 --- a/src/__version__.py +++ b/src/__version__.py @@ -1,3 +1,3 @@ """Application version update by Semantic Release.""" -VERSION = "0.13.3" +VERSION = "0.13.4" diff --git a/src/init.py b/src/init.py index ffea92eb..db7810bd 100644 --- a/src/init.py +++ b/src/init.py @@ -89,19 +89,15 @@ def filter(self, record: logging.LogRecord) -> bool: INFLUXDB_CONFIG = CONFIG.influxdb_config() if INFLUXDB_CONFIG and "enable" in INFLUXDB_CONFIG and str2bool(INFLUXDB_CONFIG["enable"]): INFLUXB_ENABLE = True - if "method" in INFLUXDB_CONFIG: - method = INFLUXDB_CONFIG["method"] - else: - method = "SYNCHRONOUS" + method = INFLUXDB_CONFIG.get("method", "SYNCHRONOUS") + scheme = INFLUXDB_CONFIG.get("scheme", "http") + write_options = INFLUXDB_CONFIG.get("batching_options", {}) + vm_mode = str2bool(INFLUXDB_CONFIG.get("vm_mode", False)) - if "scheme" not in INFLUXDB_CONFIG: - INFLUXDB_CONFIG["scheme"] = "http" + logging.info(f"Connexion à la base de données : {scheme}://{INFLUXDB_CONFIG['hostname']}:{INFLUXDB_CONFIG['port']} (vm_mode={vm_mode})") - write_options = [] - if "batching_options" in INFLUXDB_CONFIG: - write_options = INFLUXDB_CONFIG["batching_options"] INFLUXDB = InfluxDB( - scheme=INFLUXDB_CONFIG["scheme"], + scheme=scheme, hostname=INFLUXDB_CONFIG["hostname"], port=INFLUXDB_CONFIG["port"], token=INFLUXDB_CONFIG["token"], @@ -109,6 +105,7 @@ def filter(self, record: logging.LogRecord) -> bool: bucket=INFLUXDB_CONFIG["bucket"], method=method, write_options=write_options, + vm_mode=vm_mode, ) if CONFIG.get("wipe_influxdb"): INFLUXDB.purge_influxdb() diff --git a/src/models/influxdb.py b/src/models/influxdb.py index d7865c0c..9fa078b2 100644 --- a/src/models/influxdb.py +++ b/src/models/influxdb.py @@ -3,7 +3,6 @@ import influxdb_client from dateutil.tz import tzlocal -from influxdb_client.client.util import date_utils from influxdb_client.client.util.date_utils import DateHelper from influxdb_client.client.write_api import ASYNCHRONOUS, SYNCHRONOUS @@ -21,6 +20,7 @@ def __init__( bucket: str = "myelectricaldata", method="SYNCHRONOUS", write_options=None, + vm_mode=False, ): if write_options is None: write_options = {} @@ -30,45 +30,23 @@ def __init__( self.token = token self.org = org self.bucket = bucket + self.vm_mode = vm_mode self.influxdb = {} self.query_api = {} self.write_api = {} self.delete_api = {} self.buckets_api = {} self.method = method - self.write_options = {} - if "batch_size" in write_options: - self.write_options["batch_size"] = write_options["batch_size"] - else: - self.write_options["batch_size"] = 1000 - if "flush_interval" in write_options: - self.write_options["flush_interval"] = write_options["flush_interval"] - else: - self.write_options["flush_interval"] = 1000 - if "jitter_interval" in write_options: - self.write_options["jitter_interval"] = write_options["jitter_interval"] - else: - self.write_options["jitter_interval"] = 0 - if "retry_interval" in write_options: - self.write_options["retry_interval"] = write_options["retry_interval"] - else: - self.write_options["retry_interval"] = 5000 - if "max_retry_time" in write_options: - self.write_options["max_retry_time"] = write_options["max_retry_time"] - else: - self.write_options["max_retry_time"] = "180_000" - if "max_retries" in write_options: - self.write_options["max_retries"] = write_options["max_retries"] - else: - self.write_options["max_retries"] = 5 - if "max_retry_delay" in write_options: - self.write_options["max_retry_delay"] = write_options["max_retry_delay"] - else: - self.write_options["max_retry_delay"] = 125_000 - if "exponential_base" in write_options: - self.write_options["exponential_base"] = write_options["exponential_base"] - else: - self.write_options["exponential_base"] = 2 + self.write_options = { + "batch_size": write_options.get("batch_size", 1000), + "flush_interval": write_options.get("flush_interval", 1000), + "jitter_interval": write_options.get("jitter_interval", 0), + "retry_interval": write_options.get("retry_interval", 5000), + "max_retry_time": write_options.get("max_retry_time", "180_000"), + "max_retries": write_options.get("max_retries", 5), + "max_retry_delay": write_options.get("max_retry_delay", 125_000), + "exponential_base": write_options.get("exponential_base", 2), + } self.connect() self.retention = 0 self.max_retention = None @@ -76,42 +54,38 @@ def __init__( if self.retention != 0: day = int(self.retention / 60 / 60 / 24) logging.warning(f" ATTENTION, InfluxDB est configuré avec une durée de rétention de {day} jours.") - logging.warning( - f" Toutes les données supérieures à {day} jours ne seront jamais insérées dans celui-ci." - ) + logging.warning(f" Toutes les données supérieures à {day} jours ne seront jamais insérées dans celui-ci.") else: logging.warning(" => Aucune durée de rétention de données détectée.") def connect(self): separator() - logging.info(f"Connect to InfluxDB {self.hostname}:{self.port}") - date_utils.date_helper = DateHelper(timezone=tzlocal()) + logging.info(f"Connexion à la base de données : {self.scheme}://{self.hostname}:{self.port} (vm_mode={self.vm_mode})") + self.influxdb = influxdb_client.InfluxDBClient( url=f"{self.scheme}://{self.hostname}:{self.port}", token=self.token, org=self.org, timeout="600000", ) - health = self.influxdb.health() - if health.status == "pass": - title("Connection success") - else: - logging.critical( - """ - -Impossible de se connecter à la base influxdb. -Vous pouvez récupérer un exemple ici : -https://github.com/m4dm4rtig4n/enedisgateway2mqtt#configuration-file -""" - ) - exit(1) + if not self.vm_mode: + try: + health = self.influxdb.health() + if health.status == "pass": + title("Connection success") + else: + logging.critical("Impossible de se connecter à InfluxDB.") + exit(1) + except Exception as e: + logging.critical(f"Erreur de connexion InfluxDB : {e}") + exit(1) + else: + title("Mode VictoriaMetrics actif (pas de vérification de santé)") title(f"Méthode d'importation : {self.method.upper()}") if self.method.upper() == "ASYNCHRONOUS": - logging.warning( - ' ATTENTION, le mode d\'importation "ASYNCHRONOUS" est très consommateur de ressources système.' - ) + logging.warning(' ATTENTION, le mode "ASYNCHRONOUS" est très consommateur de ressources système.') self.write_api = self.influxdb.write_api(write_options=ASYNCHRONOUS) elif self.method.upper() == "SYNCHRONOUS": self.write_api = self.influxdb.write_api(write_options=SYNCHRONOUS) @@ -127,55 +101,50 @@ def connect(self): exponential_base=self.write_options["exponential_base"], ) ) + self.query_api = self.influxdb.query_api() self.delete_api = self.influxdb.delete_api() self.buckets_api = self.influxdb.buckets_api() - self.get_list_retention_policies() def purge_influxdb(self): separator_warning() - logging.warning(f"Wipe influxdb database {self.hostname}:{self.port}") + logging.warning(f"Suppression des données InfluxDB {self.hostname}:{self.port}") start = "1970-01-01T00:00:00Z" stop = datetime.datetime.utcnow() - measurement = [ - "consumption", - "production", - "consumption_detail", - "production_detail", - ] + measurement = ["consumption", "production", "consumption_detail", "production_detail"] for mesure in measurement: self.delete_api.delete(start, stop, f'_measurement="{mesure}"', self.bucket, org=self.org) - # CONFIG.set("wipe_influxdb", False) - logging.warning(f" => Data reset") + logging.warning(" => Données supprimées") def get_list_retention_policies(self): - if self.org == f"-": # InfluxDB 1.8 + if self.org == "-" or self.vm_mode: self.retention = 0 - self.max_retention = 0 - return + self.max_retention = datetime.datetime.now() else: - buckets = self.buckets_api.find_buckets().buckets - for bucket in buckets: - if bucket.name == self.bucket: - self.retention = bucket.retention_rules[0].every_seconds - self.max_retention = datetime.datetime.now() - datetime.timedelta(seconds=self.retention) + try: + buckets = self.buckets_api.find_buckets().buckets + for bucket in buckets: + if bucket.name == self.bucket: + self.retention = bucket.retention_rules[0].every_seconds + self.max_retention = datetime.datetime.now() - datetime.timedelta(seconds=self.retention) + except Exception as e: + logging.warning(f"Impossible de récupérer les règles de rétention : {e}") + self.retention = 0 + self.max_retention = datetime.datetime.now() def get(self, start, end, measurement): - if self.org != f"-": + if self.org != "-" and not self.vm_mode: query = f""" from(bucket: "{self.bucket}") |> range(start: {start}, stop: {end}) |> filter(fn: (r) => r["_measurement"] == "{measurement}") """ logging.debug(query) - output = self.query_api.query(query) - else: - # Skip for InfluxDB 1.8 - output = [] - return output + return self.query_api.query(query) + return [] def count(self, start, end, measurement): - if self.org != f"-": + if self.org != "-" and not self.vm_mode: query = f""" from(bucket: "{self.bucket}") |> range(start: {start}, stop: {end}) @@ -185,11 +154,8 @@ def count(self, start, end, measurement): |> yield(name: "count") """ logging.debug(query) - output = self.query_api.query(query) - else: - # Skip for InfluxDB 1.8 - output = [] - return output + return self.query_api.query(query) + return [] def delete(self, date, measurement): self.delete_api.delete(date, date, f'_measurement="{measurement}"', self.bucket, org=self.org) @@ -204,13 +170,7 @@ def write(self, tags, date=None, fields=None, measurement="log"): record = { "measurement": measurement, "time": date_object, - "tags": {}, - "fields": {}, + "tags": tags or {}, + "fields": fields or {}, } - if tags: - for key, value in tags.items(): - record["tags"][key] = value - if fields is not None: - for key, value in fields.items(): - record["fields"][key] = value self.write_api.write(bucket=self.bucket, org=self.org, record=record)