Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Toutes les données seront importées dans un cache local (SQLite) ou externe (P

- MQTT
- Home Assistant (Via l'auto-discovery MQTT)
- InfluxDB
- InfluxDB / VictoriaMetrics
- PostgresSQL

L'outil possède également des APIs.
Expand Down Expand Up @@ -58,7 +58,7 @@ All data are import in local cache (SQLite) or external backend (PostgreSQL) whi

- MQTT
- Home Assistant (auto-discovery MQTT)
- InfluxDB
- InfluxDB / VictoriaMetrics
- PostgresSQL

The tool also has APIs.
Expand Down
3 changes: 2 additions & 1 deletion config.exemple.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ influxdb:
enable: false
scheme: http
hostname: influxdb
port: 8086
port: 8086 # 8086 pour influxdb, 8428 pour victoriametrics
token: myelectricaldata
org: myelectricaldata
bucket: myelectricaldata
# ATTENTION, L'activation de l'importation asynchrone va réduire fortement le temps d'importation dans InfluxDB
# mais va augmenter la consommation mémoire & CPU et donc à activer uniquement sur un hardware robuste.
method: synchronous # Mode disponible : synchronous / asynchronous / batching
vm_mode: true # Active le mode VictoriaMetrics true sinon false pour influxdb
# batching_options permet uniquement de configurer la methode `batching`.
# Pour plus d'information : https://github.com/influxdata/influxdb-client-python#batching
batching_options:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
# POETRY CONFIGURATION
[tool.poetry]
name = "myelectricaldata-import"
version = "0.13.2"
version = "0.13.4"
description = "MyElectricalData official client"
authors = ["Clément VALENTIN <m4dm4rtig4n@gmail.com>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion src/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Application version update by Semantic Release."""

VERSION = "0.13.2"
VERSION = "0.13.4"
17 changes: 7 additions & 10 deletions src/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,23 @@ def filter(self, record: logging.LogRecord) -> bool:
INFLUXDB_CONFIG = CONFIG.influxdb_config()
if INFLUXDB_CONFIG and "enable" in INFLUXDB_CONFIG and str2bool(INFLUXDB_CONFIG["enable"]):
INFLUXB_ENABLE = True
if "method" in INFLUXDB_CONFIG:
method = INFLUXDB_CONFIG["method"]
else:
method = "SYNCHRONOUS"
method = INFLUXDB_CONFIG.get("method", "SYNCHRONOUS")
scheme = INFLUXDB_CONFIG.get("scheme", "http")
write_options = INFLUXDB_CONFIG.get("batching_options", {})
vm_mode = str2bool(INFLUXDB_CONFIG.get("vm_mode", False))

if "scheme" not in INFLUXDB_CONFIG:
INFLUXDB_CONFIG["scheme"] = "http"
logging.info(f"Connexion à la base de données : {scheme}://{INFLUXDB_CONFIG['hostname']}:{INFLUXDB_CONFIG['port']} (vm_mode={vm_mode})")

write_options = []
if "batching_options" in INFLUXDB_CONFIG:
write_options = INFLUXDB_CONFIG["batching_options"]
INFLUXDB = InfluxDB(
scheme=INFLUXDB_CONFIG["scheme"],
scheme=scheme,
hostname=INFLUXDB_CONFIG["hostname"],
port=INFLUXDB_CONFIG["port"],
token=INFLUXDB_CONFIG["token"],
org=INFLUXDB_CONFIG["org"],
bucket=INFLUXDB_CONFIG["bucket"],
method=method,
write_options=write_options,
vm_mode=vm_mode,
)
if CONFIG.get("wipe_influxdb"):
INFLUXDB.purge_influxdb()
Expand Down
146 changes: 53 additions & 93 deletions src/models/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import influxdb_client
from dateutil.tz import tzlocal
from influxdb_client.client.util import date_utils
from influxdb_client.client.util.date_utils import DateHelper
from influxdb_client.client.write_api import ASYNCHRONOUS, SYNCHRONOUS

Expand All @@ -21,6 +20,7 @@ def __init__(
bucket: str = "myelectricaldata",
method="SYNCHRONOUS",
write_options=None,
vm_mode=False,
):
if write_options is None:
write_options = {}
Expand All @@ -30,88 +30,62 @@ def __init__(
self.token = token
self.org = org
self.bucket = bucket
self.vm_mode = vm_mode
self.influxdb = {}
self.query_api = {}
self.write_api = {}
self.delete_api = {}
self.buckets_api = {}
self.method = method
self.write_options = {}
if "batch_size" in write_options:
self.write_options["batch_size"] = write_options["batch_size"]
else:
self.write_options["batch_size"] = 1000
if "flush_interval" in write_options:
self.write_options["flush_interval"] = write_options["flush_interval"]
else:
self.write_options["flush_interval"] = 1000
if "jitter_interval" in write_options:
self.write_options["jitter_interval"] = write_options["jitter_interval"]
else:
self.write_options["jitter_interval"] = 0
if "retry_interval" in write_options:
self.write_options["retry_interval"] = write_options["retry_interval"]
else:
self.write_options["retry_interval"] = 5000
if "max_retry_time" in write_options:
self.write_options["max_retry_time"] = write_options["max_retry_time"]
else:
self.write_options["max_retry_time"] = "180_000"
if "max_retries" in write_options:
self.write_options["max_retries"] = write_options["max_retries"]
else:
self.write_options["max_retries"] = 5
if "max_retry_delay" in write_options:
self.write_options["max_retry_delay"] = write_options["max_retry_delay"]
else:
self.write_options["max_retry_delay"] = 125_000
if "exponential_base" in write_options:
self.write_options["exponential_base"] = write_options["exponential_base"]
else:
self.write_options["exponential_base"] = 2
self.write_options = {
"batch_size": write_options.get("batch_size", 1000),
"flush_interval": write_options.get("flush_interval", 1000),
"jitter_interval": write_options.get("jitter_interval", 0),
"retry_interval": write_options.get("retry_interval", 5000),
"max_retry_time": write_options.get("max_retry_time", "180_000"),
"max_retries": write_options.get("max_retries", 5),
"max_retry_delay": write_options.get("max_retry_delay", 125_000),
"exponential_base": write_options.get("exponential_base", 2),
}
self.connect()
self.retention = 0
self.max_retention = None
self.get_list_retention_policies()
if self.retention != 0:
day = int(self.retention / 60 / 60 / 24)
logging.warning(f"<!> ATTENTION, InfluxDB est configuré avec une durée de rétention de {day} jours.")
logging.warning(
f" Toutes les données supérieures à {day} jours ne seront jamais insérées dans celui-ci."
)
logging.warning(f" Toutes les données supérieures à {day} jours ne seront jamais insérées dans celui-ci.")
else:
logging.warning(" => Aucune durée de rétention de données détectée.")

def connect(self):
separator()
logging.info(f"Connect to InfluxDB {self.hostname}:{self.port}")
date_utils.date_helper = DateHelper(timezone=tzlocal())
logging.info(f"Connexion à la base de données : {self.scheme}://{self.hostname}:{self.port} (vm_mode={self.vm_mode})")

self.influxdb = influxdb_client.InfluxDBClient(
url=f"{self.scheme}://{self.hostname}:{self.port}",
token=self.token,
org=self.org,
timeout="600000",
)
health = self.influxdb.health()
if health.status == "pass":
title("Connection success")
else:
logging.critical(
"""

Impossible de se connecter à la base influxdb.

Vous pouvez récupérer un exemple ici :
https://github.com/m4dm4rtig4n/enedisgateway2mqtt#configuration-file
"""
)
exit(1)
if not self.vm_mode:
try:
health = self.influxdb.health()
if health.status == "pass":
title("Connection success")
else:
logging.critical("Impossible de se connecter à InfluxDB.")
exit(1)
except Exception as e:
logging.critical(f"Erreur de connexion InfluxDB : {e}")
exit(1)
else:
title("Mode VictoriaMetrics actif (pas de vérification de santé)")

title(f"Méthode d'importation : {self.method.upper()}")
if self.method.upper() == "ASYNCHRONOUS":
logging.warning(
' <!> ATTENTION, le mode d\'importation "ASYNCHRONOUS" est très consommateur de ressources système.'
)
logging.warning(' <!> ATTENTION, le mode "ASYNCHRONOUS" est très consommateur de ressources système.')
self.write_api = self.influxdb.write_api(write_options=ASYNCHRONOUS)
elif self.method.upper() == "SYNCHRONOUS":
self.write_api = self.influxdb.write_api(write_options=SYNCHRONOUS)
Expand All @@ -127,55 +101,50 @@ def connect(self):
exponential_base=self.write_options["exponential_base"],
)
)

self.query_api = self.influxdb.query_api()
self.delete_api = self.influxdb.delete_api()
self.buckets_api = self.influxdb.buckets_api()
self.get_list_retention_policies()

def purge_influxdb(self):
separator_warning()
logging.warning(f"Wipe influxdb database {self.hostname}:{self.port}")
logging.warning(f"Suppression des données InfluxDB {self.hostname}:{self.port}")
start = "1970-01-01T00:00:00Z"
stop = datetime.datetime.utcnow()
measurement = [
"consumption",
"production",
"consumption_detail",
"production_detail",
]
measurement = ["consumption", "production", "consumption_detail", "production_detail"]
for mesure in measurement:
self.delete_api.delete(start, stop, f'_measurement="{mesure}"', self.bucket, org=self.org)
# CONFIG.set("wipe_influxdb", False)
logging.warning(f" => Data reset")
logging.warning(" => Données supprimées")

def get_list_retention_policies(self):
if self.org == f"-": # InfluxDB 1.8
if self.org == "-" or self.vm_mode:
self.retention = 0
self.max_retention = 0
return
self.max_retention = datetime.datetime.now()
else:
buckets = self.buckets_api.find_buckets().buckets
for bucket in buckets:
if bucket.name == self.bucket:
self.retention = bucket.retention_rules[0].every_seconds
self.max_retention = datetime.datetime.now() - datetime.timedelta(seconds=self.retention)
try:
buckets = self.buckets_api.find_buckets().buckets
for bucket in buckets:
if bucket.name == self.bucket:
self.retention = bucket.retention_rules[0].every_seconds
self.max_retention = datetime.datetime.now() - datetime.timedelta(seconds=self.retention)
except Exception as e:
logging.warning(f"Impossible de récupérer les règles de rétention : {e}")
self.retention = 0
self.max_retention = datetime.datetime.now()

def get(self, start, end, measurement):
if self.org != f"-":
if self.org != "-" and not self.vm_mode:
query = f"""
from(bucket: "{self.bucket}")
|> range(start: {start}, stop: {end})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
"""
logging.debug(query)
output = self.query_api.query(query)
else:
# Skip for InfluxDB 1.8
output = []
return output
return self.query_api.query(query)
return []

def count(self, start, end, measurement):
if self.org != f"-":
if self.org != "-" and not self.vm_mode:
query = f"""
from(bucket: "{self.bucket}")
|> range(start: {start}, stop: {end})
Expand All @@ -185,11 +154,8 @@ def count(self, start, end, measurement):
|> yield(name: "count")
"""
logging.debug(query)
output = self.query_api.query(query)
else:
# Skip for InfluxDB 1.8
output = []
return output
return self.query_api.query(query)
return []

def delete(self, date, measurement):
self.delete_api.delete(date, date, f'_measurement="{measurement}"', self.bucket, org=self.org)
Expand All @@ -204,13 +170,7 @@ def write(self, tags, date=None, fields=None, measurement="log"):
record = {
"measurement": measurement,
"time": date_object,
"tags": {},
"fields": {},
"tags": tags or {},
"fields": fields or {},
}
if tags:
for key, value in tags.items():
record["tags"][key] = value
if fields is not None:
for key, value in fields.items():
record["fields"][key] = value
self.write_api.write(bucket=self.bucket, org=self.org, record=record)
2 changes: 1 addition & 1 deletion src/templates/html/head.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<link rel="stylesheet" type="text/css" href="https://cdn.datatables.net/1.12.1/css/jquery.dataTables.css">

<!-- NOTIFY-->
<script src="https://rawgit.com/notifyjs/notifyjs/master/dist/notify.js"></script>
<script src="https://cdn.jsdelivr.net/gh/jpillora/notifyjs@master/dist/notify.min.js"></script>
<script src="/static/js/notif.js"></script>
<link rel="stylesheet" href="/static/css/notif.css">

Expand Down
2 changes: 1 addition & 1 deletion src/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
src="https://cdn.datatables.net/plug-ins/1.12.1/sorting/custom-data-source/dom-text.js"></script>
<link rel="stylesheet" type="text/css" href="https://cdn.datatables.net/1.12.1/css/jquery.dataTables.css">
<!-- NOTIFY-->
<script src="https://rawgit.com/notifyjs/notifyjs/master/dist/notify.js"></script>
<script src="https://cdn.jsdelivr.net/gh/jpillora/notifyjs@master/dist/notify.min.js"></script>
<script src="/static/js/notif.js"></script>
<link rel="stylesheet" href="/static/css/notif.css">
<!-- GOOGLE FONT-->
Expand Down
Loading