From e6133ce1222be1f05309ebc06f61ccfc546f903f Mon Sep 17 00:00:00 2001 From: rleidner Date: Mon, 7 Apr 2025 10:43:30 +0200 Subject: [PATCH 1/5] extend soc module vwid by skoda api --- packages/modules/vehicles/vwid/api.py | 58 ++++- packages/modules/vehicles/vwid/libskoda.py | 241 +++++++++++++++++++++ 2 files changed, 289 insertions(+), 10 deletions(-) create mode 100755 packages/modules/vehicles/vwid/libskoda.py diff --git a/packages/modules/vehicles/vwid/api.py b/packages/modules/vehicles/vwid/api.py index bf4198c294..37cbf39b5d 100755 --- a/packages/modules/vehicles/vwid/api.py +++ b/packages/modules/vehicles/vwid/api.py @@ -3,6 +3,7 @@ from logging import getLogger from typing import Union from modules.vehicles.vwid import libvwid +from modules.vehicles.vwid import libskoda import aiohttp from asyncio import new_event_loop, set_event_loop from time import time, mktime @@ -33,7 +34,7 @@ def __init__(self): self.su = socUtils() pass - # async method, called from sync fetch_soc, required because libvwid expects async environment + # async method, called from sync fetch_soc, required because libvwid/libskoda expect async environment async def _fetch_soc(self, conf: VWId, vehicle: int) -> Union[int, float, str]: @@ -45,8 +46,28 @@ async def _fetch_soc(self, self.accessTokenFile = str(RAMDISK_PATH) + '/soc_vwid_accessToken_vh_' + str(vehicle) self.accessToken_old = {} + brand = self.vin[:3] + log.info("brand = " + brand) + if brand == "WVW": + self.brand = "vwid" + elif brand == "SMB": + self.brand = "skoda" + else: + log.error("Brand " + brand + " is not one of WVW, SMB") + self.soc = 0 + self.range = 0.0 + self.soc_ts = "" + self.soc_tsX = time() + return self.soc, self.range, self.soc_ts, self.soc_tsX + async with aiohttp.ClientSession() as self.session: - self.w = libvwid.vwid(self.session) + if self.brand == "vwid": + self.w = libvwid.vwid(self.session) + elif self.brand == "": + self.w = libskoda.vwid(self.session) + else: + log.error("Brand " + self.brand + " is not one of vwid, skoda") + self.w.set_vin(self.vin) self.w.set_credentials(self.user_id, self.password) self.w.set_jobs(['charging']) @@ -83,16 +104,33 @@ async def _fetch_soc(self, + dumps(self.data['userCapabilities']['capabilitiesStatus']['error'], ensure_ascii=False, indent=4)) - if self.su.keys_exist(self.data, 'charging', 'batteryStatus'): - log.debug("batteryStatus: \n" + - dumps(self.data['charging']['batteryStatus'], - ensure_ascii=False, indent=4)) + if self.brand == "vwid": + if self.su.keys_exist(self.data, 'charging', 'batteryStatus'): + log.debug("batteryStatus: \n" + + dumps(self.data['charging']['batteryStatus'], + ensure_ascii=False, indent=4)) + elif self.brand == "skoda": + if self.su.keys_exist(self.data, 'primaryEngineRange'): + log.debug("batteryStatus: \n" + + dumps(self.data['primaryEngineRange'], + ensure_ascii=False, indent=4)) + else: + log.error("Brand " + self.brand + " is not one of vwid, skoda") try: - self.soc = int(self.data['charging']['batteryStatus']['value']['currentSOC_pct']) - self.range = float(self.data['charging']['batteryStatus']['value']['cruisingRangeElectric_km']) - soc_tsZ = self.data['charging']['batteryStatus']['value']['carCapturedTimestamp'] - soc_tsdtZ = datetime.strptime(soc_tsZ, ts_fmt + "Z") + if self.brand == "vwid": + self.soc = int(self.data['charging']['batteryStatus']['value']['currentSOC_pct']) + self.range = float(self.data['charging']['batteryStatus']['value']['cruisingRangeElectric_km']) + soc_tsZ = self.data['charging']['batteryStatus']['value']['carCapturedTimestamp'] + soc_tsdtZ = datetime.strptime(soc_tsZ, ts_fmt + "Z") + elif self.brand == "skoda": + self.soc = int(self.data['primaryEngineRange']['currentSoCInPercent']) + self.range = float(self.data['primaryEngineRange']['remainingRangeInKm']) + soc_tsZ = self.data['carCapturedTimestamp'] + soc_tsdtZ = datetime.strptime(soc_tsZ, ts_fmt + ".%fZ") + else: + raise "Brand " + self.brand + " is not one of vwid, skoda" + soc_tsdtL = utc2local(soc_tsdtZ) self.soc_tsX = datetime.timestamp(soc_tsdtL) self.soc_ts = datetime.strftime(soc_tsdtL, ts_fmt) diff --git a/packages/modules/vehicles/vwid/libskoda.py b/packages/modules/vehicles/vwid/libskoda.py new file mode 100755 index 0000000000..e17e970f05 --- /dev/null +++ b/packages/modules/vehicles/vwid/libskoda.py @@ -0,0 +1,241 @@ +# A Python class to communicate with the "Skoda Connect" API. +# Adapted the libvwid.py module to skoda interface + +import secrets +import logging +import json +import uuid +import base64 +import hashlib + +from helpermodules.utils.error_handling import ImportErrorContext +with ImportErrorContext(): + import lxml.html + +# Constants +LOGIN_BASE = "https://identity.vwgroup.io/oidc/v1" +LOGIN_HANDLER_BASE = "https://identity.vwgroup.io" +API_BASE = "https://mysmob.api.connect.skoda-auto.cz/api" +CLIENT_ID = "7f045eee-7003-4379-9968-9355ed2adb06@apps_vw-dilab_com" + + +class vwid: + def __init__(self, session): + self.session = session + self.headers = {} + self.log = logging.getLogger(__name__) + self.jobs_string = 'all' + + def form_from_response(self, text): + page = lxml.html.fromstring(text) + elements = page.xpath('//form//input[@type="hidden"]') + form = {x.attrib['name']: x.attrib['value'] for x in elements} + return (form, page.forms[0].action) + + def password_form(self, text): + page = lxml.html.fromstring(text) + elements = page.xpath('//script') + + # Todo: Find more elegant way parse this... + objects = {} + for a in elements: + if (a.text) and (a.text.find('window._IDK') != -1): + text = a.text.strip() + text = text[text.find('\n'):text.rfind('\n')].strip() + for line in text.split('\n'): + try: + (name, val) = line.strip().split(':', 1) + except ValueError: + continue + val = val.strip('\', ') + objects[name] = val + + json_model = json.loads(objects['templateModel']) + + if ('errorCode' in json_model): + self.log.error("Login error: %s", json_model['errorCode']) + return False + + try: + # Generate form + form = {} + form['relayState'] = json_model['relayState'] + form['hmac'] = json_model['hmac'] + form['email'] = json_model['emailPasswordForm']['email'] + form['_csrf'] = objects['csrf_token'] + + # Generate URL action + action = '/signin-service/v1/%s/%s'\ + % (json_model['clientLegalEntityModel']['clientId'], json_model['postAction']) + + return (form, action) + + except KeyError: + self.log.exception("Missing fields in response from VW API") + return False + + def set_vin(self, vin): + self.vin = vin + + def set_credentials(self, username, password): + self.username = username + self.password = password + + def set_jobs(self, jobs): + self.jobs_string = ','.join(jobs) + + def get_code_challenge(self): + code_verifier = secrets.token_urlsafe(64).replace('+', '-').replace('/', '_').replace('=', '') + code_challenge = base64.b64encode(hashlib.sha256(code_verifier.encode('utf-8')).digest()) + code_challenge = code_challenge.decode('utf-8').replace('+', '-').replace('/', '_').replace('=', '') + return (code_verifier, code_challenge) + + async def connect(self, username, password): + self.set_credentials(username, password) + return (await self.reconnect()) + + async def reconnect(self): + # Get code challenge and verifier + code_verifier, code_challenge = self.get_code_challenge() + + # Get authorize page + _scope = 'address badge birthdate cars driversLicense dealers email mileage mbb nationalIdentifier' + _scope = _scope + ' openid phone profession profile vin' + payload = { + 'client_id': CLIENT_ID, + 'scope': _scope, + 'response_type': 'code id_token', + 'nonce': secrets.token_urlsafe(12), + 'redirect_uri': 'myskoda://redirect/login/', + 'state': str(uuid.uuid4()), + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256' + } + + response = await self.session.get(LOGIN_BASE + '/authorize', params=payload) + if response.status >= 400: + self.log.error(f"Authorize: Non-2xx response ({response.status})") + # Non 2xx response, failed + return False + + # Fill form with email (username) + (form, action) = self.form_from_response(await response.read()) + form['email'] = self.username + response = await self.session.post(LOGIN_HANDLER_BASE+action, data=form) + if response.status >= 400: + self.log.error("Email: Non-2xx response") + return False + + # Fill form with password + (form, action) = self.password_form(await response.read()) + form['password'] = self.password + url = LOGIN_HANDLER_BASE + action + response = await self.session.post(url, data=form, allow_redirects=False) + + # Can get a 303 redirect for a "terms and conditions" page + if (response.status == 303): + url = response.headers['Location'] + if ("terms-and-conditions" in url): + # Get terms and conditions page + url = LOGIN_HANDLER_BASE + url + response = await self.session.get(url, data=form, allow_redirects=False) + (form, action) = self.form_from_response(await response.read()) + + url = LOGIN_HANDLER_BASE + action + response = await self.session.post(url, data=form, allow_redirects=False) + + self.log.warn("Agreed to terms and conditions") + else: + self.log.error("Got unknown 303 redirect") + return False + + # Handle every single redirect and stop if the redirect + # URL uses the weconnect adapter. + while (True): + url = response.headers['Location'] + if (url.split(':')[0] == "myskoda"): + if not ('id_token' in url): + self.log.error("Missing id token") + return False + # Parse query string + query_string = url.split('#')[1] + query = {x[0]: x[1] for x in [x.split("=") for x in query_string.split("&")]} + break + + if (response.status != 302): + self.log.error("Not redirected, status %u" % response.status) + return False + + response = await self.session.get(url, data=form, allow_redirects=False) + + self.headers = dict(response.headers) + + # Get final token + params = { + 'tokenType': 'CONNECT' + } + payload = { + 'code': query['code'], + 'redirectUri': "myskoda://redirect/login/", + 'verifier': code_verifier + } + response = await self.session.post(API_BASE + '/v1/authentication/exchange-authorization-code', + params=params, json=payload) + if response.status >= 400: + self.log.error("Login: Non-2xx response") + # Non 2xx response, failed + return False + self.tokens = await response.json() + + # Update header with final token + self.headers['Authorization'] = 'Bearer %s' % self.tokens["accessToken"] + + # Success + return True + + async def refresh_tokens(self): + if not self.headers: + return False + + params = { + 'tokenType': 'CONNECT' + } + # Use the refresh token + payload = { + 'token': self.tokens["refreshToken"] + } + + response = await self.session.post(API_BASE + '/v1/authentication/refresh-token', params=params, json=payload) + if response.status >= 400: + return False + self.tokens = await response.json() + + # Use the newly received access token + self.headers['Authorization'] = 'Bearer %s' % self.tokens["accessToken"] + + return True + + async def get_status(self): + status_url = f"{API_BASE}/v2/vehicle-status/{self.vin}/driving-range" + response = await self.session.get(status_url, headers=self.headers) + + # If first attempt fails, try to refresh tokens + if response.status >= 400: + self.log.debug("Refreshing tokens") + if await self.refresh_tokens(): + response = await self.session.get(status_url, headers=self.headers) + + # If refreshing tokens failed, try a full reconnect + if response.status >= 400: + self.log.info("Reconnecting") + if await self.reconnect(): + response = await self.session.get(status_url, headers=self.headers) + else: + self.log.error("Reconnect failed") + return {} + + if response.status >= 400: + self.log.error("Get status failed") + return {} + + return (await response.json()) From 22a81e1344a02078023e7f20ec181f56f8e93fc5 Mon Sep 17 00:00:00 2001 From: rleidner Date: Mon, 7 Apr 2025 10:46:04 +0200 Subject: [PATCH 2/5] SMB -> TMB --- packages/modules/vehicles/vwid/api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/modules/vehicles/vwid/api.py b/packages/modules/vehicles/vwid/api.py index 37cbf39b5d..b2ccc67977 100755 --- a/packages/modules/vehicles/vwid/api.py +++ b/packages/modules/vehicles/vwid/api.py @@ -50,10 +50,10 @@ async def _fetch_soc(self, log.info("brand = " + brand) if brand == "WVW": self.brand = "vwid" - elif brand == "SMB": + elif brand == "TMB": self.brand = "skoda" else: - log.error("Brand " + brand + " is not one of WVW, SMB") + log.error("Brand " + brand + " is not one of WVW, TMB") self.soc = 0 self.range = 0.0 self.soc_ts = "" From 9d14a71820f3aeaac41a96c94f06c64b8f691c61 Mon Sep 17 00:00:00 2001 From: rleidner Date: Mon, 7 Apr 2025 12:20:07 +0200 Subject: [PATCH 3/5] add missing skoda --- packages/modules/vehicles/vwid/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/modules/vehicles/vwid/api.py b/packages/modules/vehicles/vwid/api.py index b2ccc67977..b25f8ec72d 100755 --- a/packages/modules/vehicles/vwid/api.py +++ b/packages/modules/vehicles/vwid/api.py @@ -63,7 +63,7 @@ async def _fetch_soc(self, async with aiohttp.ClientSession() as self.session: if self.brand == "vwid": self.w = libvwid.vwid(self.session) - elif self.brand == "": + elif self.brand == "skoda": self.w = libskoda.vwid(self.session) else: log.error("Brand " + self.brand + " is not one of vwid, skoda") From 773dddfdfd722af6ef009b7dd8595cc95c6a67c1 Mon Sep 17 00:00:00 2001 From: rleidner Date: Tue, 8 Apr 2025 21:37:58 +0200 Subject: [PATCH 4/5] fix state: valid access token but no refressh token --- packages/modules/vehicles/vwid/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/modules/vehicles/vwid/api.py b/packages/modules/vehicles/vwid/api.py index b25f8ec72d..7b585d3299 100755 --- a/packages/modules/vehicles/vwid/api.py +++ b/packages/modules/vehicles/vwid/api.py @@ -161,7 +161,7 @@ async def _fetch_soc(self, else: log.debug("Old refreshToken expires on " + self.expOld_dt + ", keep it") - else: + elif self.refreshTokenNew != initialToken: self.store_refreshToken = True # no old refreshToken, store new refreshToken anyway if self.store_refreshToken: # refreshToken needs to be stored in config json From 2148129cf0d1b84112350bbde0624246bb4e1c72 Mon Sep 17 00:00:00 2001 From: rleidner Date: Tue, 8 Apr 2025 23:37:39 +0200 Subject: [PATCH 5/5] restart failed build process --- packages/modules/vehicles/vwid/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/modules/vehicles/vwid/api.py b/packages/modules/vehicles/vwid/api.py index 7b585d3299..220bd78689 100755 --- a/packages/modules/vehicles/vwid/api.py +++ b/packages/modules/vehicles/vwid/api.py @@ -46,6 +46,7 @@ async def _fetch_soc(self, self.accessTokenFile = str(RAMDISK_PATH) + '/soc_vwid_accessToken_vh_' + str(vehicle) self.accessToken_old = {} + # derive brand from VIN brand = self.vin[:3] log.info("brand = " + brand) if brand == "WVW":