Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions packages/modules/vehicles/vwid/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from logging import getLogger
from typing import Union
from modules.vehicles.vwid import libvwid
from modules.vehicles.vwid import libskoda
import aiohttp
from asyncio import new_event_loop, set_event_loop
from time import time, mktime
Expand Down Expand Up @@ -33,7 +34,7 @@ def __init__(self):
self.su = socUtils()
pass

# async method, called from sync fetch_soc, required because libvwid expects async environment
# async method, called from sync fetch_soc, required because libvwid/libskoda expect async environment
async def _fetch_soc(self,
conf: VWId,
vehicle: int) -> Union[int, float, str]:
Expand All @@ -45,8 +46,29 @@ async def _fetch_soc(self,
self.accessTokenFile = str(RAMDISK_PATH) + '/soc_vwid_accessToken_vh_' + str(vehicle)
self.accessToken_old = {}

# derive brand from VIN
brand = self.vin[:3]
log.info("brand = " + brand)
if brand == "WVW":
self.brand = "vwid"
elif brand == "TMB":
self.brand = "skoda"
else:
log.error("Brand " + brand + " is not one of WVW, TMB")
self.soc = 0
self.range = 0.0
self.soc_ts = ""
self.soc_tsX = time()
return self.soc, self.range, self.soc_ts, self.soc_tsX

async with aiohttp.ClientSession() as self.session:
self.w = libvwid.vwid(self.session)
if self.brand == "vwid":
self.w = libvwid.vwid(self.session)
elif self.brand == "skoda":
self.w = libskoda.vwid(self.session)
else:
log.error("Brand " + self.brand + " is not one of vwid, skoda")

self.w.set_vin(self.vin)
self.w.set_credentials(self.user_id, self.password)
self.w.set_jobs(['charging'])
Expand Down Expand Up @@ -83,16 +105,33 @@ async def _fetch_soc(self,
+ dumps(self.data['userCapabilities']['capabilitiesStatus']['error'],
ensure_ascii=False, indent=4))

if self.su.keys_exist(self.data, 'charging', 'batteryStatus'):
log.debug("batteryStatus: \n" +
dumps(self.data['charging']['batteryStatus'],
ensure_ascii=False, indent=4))
if self.brand == "vwid":
if self.su.keys_exist(self.data, 'charging', 'batteryStatus'):
log.debug("batteryStatus: \n" +
dumps(self.data['charging']['batteryStatus'],
ensure_ascii=False, indent=4))
elif self.brand == "skoda":
if self.su.keys_exist(self.data, 'primaryEngineRange'):
log.debug("batteryStatus: \n" +
dumps(self.data['primaryEngineRange'],
ensure_ascii=False, indent=4))
else:
log.error("Brand " + self.brand + " is not one of vwid, skoda")

try:
self.soc = int(self.data['charging']['batteryStatus']['value']['currentSOC_pct'])
self.range = float(self.data['charging']['batteryStatus']['value']['cruisingRangeElectric_km'])
soc_tsZ = self.data['charging']['batteryStatus']['value']['carCapturedTimestamp']
soc_tsdtZ = datetime.strptime(soc_tsZ, ts_fmt + "Z")
if self.brand == "vwid":
self.soc = int(self.data['charging']['batteryStatus']['value']['currentSOC_pct'])
self.range = float(self.data['charging']['batteryStatus']['value']['cruisingRangeElectric_km'])
soc_tsZ = self.data['charging']['batteryStatus']['value']['carCapturedTimestamp']
soc_tsdtZ = datetime.strptime(soc_tsZ, ts_fmt + "Z")
elif self.brand == "skoda":
self.soc = int(self.data['primaryEngineRange']['currentSoCInPercent'])
self.range = float(self.data['primaryEngineRange']['remainingRangeInKm'])
soc_tsZ = self.data['carCapturedTimestamp']
soc_tsdtZ = datetime.strptime(soc_tsZ, ts_fmt + ".%fZ")
else:
raise "Brand " + self.brand + " is not one of vwid, skoda"

soc_tsdtL = utc2local(soc_tsdtZ)
self.soc_tsX = datetime.timestamp(soc_tsdtL)
self.soc_ts = datetime.strftime(soc_tsdtL, ts_fmt)
Expand Down Expand Up @@ -123,7 +162,7 @@ async def _fetch_soc(self,

else:
log.debug("Old refreshToken expires on " + self.expOld_dt + ", keep it")
else:
elif self.refreshTokenNew != initialToken:
self.store_refreshToken = True # no old refreshToken, store new refreshToken anyway

if self.store_refreshToken: # refreshToken needs to be stored in config json
Expand Down
241 changes: 241 additions & 0 deletions packages/modules/vehicles/vwid/libskoda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# A Python class to communicate with the "Skoda Connect" API.
# Adapted the libvwid.py module to skoda interface

import secrets
import logging
import json
import uuid
import base64
import hashlib

from helpermodules.utils.error_handling import ImportErrorContext
with ImportErrorContext():
import lxml.html

# Constants
LOGIN_BASE = "https://identity.vwgroup.io/oidc/v1"
LOGIN_HANDLER_BASE = "https://identity.vwgroup.io"
API_BASE = "https://mysmob.api.connect.skoda-auto.cz/api"
CLIENT_ID = "7f045eee-7003-4379-9968-9355ed2adb06@apps_vw-dilab_com"


class vwid:
def __init__(self, session):
self.session = session
self.headers = {}
self.log = logging.getLogger(__name__)
self.jobs_string = 'all'

def form_from_response(self, text):
page = lxml.html.fromstring(text)
elements = page.xpath('//form//input[@type="hidden"]')
form = {x.attrib['name']: x.attrib['value'] for x in elements}
return (form, page.forms[0].action)

def password_form(self, text):
page = lxml.html.fromstring(text)
elements = page.xpath('//script')

# Todo: Find more elegant way parse this...
objects = {}
for a in elements:
if (a.text) and (a.text.find('window._IDK') != -1):
text = a.text.strip()
text = text[text.find('\n'):text.rfind('\n')].strip()
for line in text.split('\n'):
try:
(name, val) = line.strip().split(':', 1)
except ValueError:
continue
val = val.strip('\', ')
objects[name] = val

json_model = json.loads(objects['templateModel'])

if ('errorCode' in json_model):
self.log.error("Login error: %s", json_model['errorCode'])
return False

try:
# Generate form
form = {}
form['relayState'] = json_model['relayState']
form['hmac'] = json_model['hmac']
form['email'] = json_model['emailPasswordForm']['email']
form['_csrf'] = objects['csrf_token']

# Generate URL action
action = '/signin-service/v1/%s/%s'\
% (json_model['clientLegalEntityModel']['clientId'], json_model['postAction'])

return (form, action)

except KeyError:
self.log.exception("Missing fields in response from VW API")
return False

def set_vin(self, vin):
self.vin = vin

def set_credentials(self, username, password):
self.username = username
self.password = password

def set_jobs(self, jobs):
self.jobs_string = ','.join(jobs)

def get_code_challenge(self):
code_verifier = secrets.token_urlsafe(64).replace('+', '-').replace('/', '_').replace('=', '')
code_challenge = base64.b64encode(hashlib.sha256(code_verifier.encode('utf-8')).digest())
code_challenge = code_challenge.decode('utf-8').replace('+', '-').replace('/', '_').replace('=', '')
return (code_verifier, code_challenge)

async def connect(self, username, password):
self.set_credentials(username, password)
return (await self.reconnect())

async def reconnect(self):
# Get code challenge and verifier
code_verifier, code_challenge = self.get_code_challenge()

# Get authorize page
_scope = 'address badge birthdate cars driversLicense dealers email mileage mbb nationalIdentifier'
_scope = _scope + ' openid phone profession profile vin'
payload = {
'client_id': CLIENT_ID,
'scope': _scope,
'response_type': 'code id_token',
'nonce': secrets.token_urlsafe(12),
'redirect_uri': 'myskoda://redirect/login/',
'state': str(uuid.uuid4()),
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}

response = await self.session.get(LOGIN_BASE + '/authorize', params=payload)
if response.status >= 400:
self.log.error(f"Authorize: Non-2xx response ({response.status})")
# Non 2xx response, failed
return False

# Fill form with email (username)
(form, action) = self.form_from_response(await response.read())
form['email'] = self.username
response = await self.session.post(LOGIN_HANDLER_BASE+action, data=form)
if response.status >= 400:
self.log.error("Email: Non-2xx response")
return False

# Fill form with password
(form, action) = self.password_form(await response.read())
form['password'] = self.password
url = LOGIN_HANDLER_BASE + action
response = await self.session.post(url, data=form, allow_redirects=False)

# Can get a 303 redirect for a "terms and conditions" page
if (response.status == 303):
url = response.headers['Location']
if ("terms-and-conditions" in url):
# Get terms and conditions page
url = LOGIN_HANDLER_BASE + url
response = await self.session.get(url, data=form, allow_redirects=False)
(form, action) = self.form_from_response(await response.read())

url = LOGIN_HANDLER_BASE + action
response = await self.session.post(url, data=form, allow_redirects=False)

self.log.warn("Agreed to terms and conditions")
else:
self.log.error("Got unknown 303 redirect")
return False

# Handle every single redirect and stop if the redirect
# URL uses the weconnect adapter.
while (True):
url = response.headers['Location']
if (url.split(':')[0] == "myskoda"):
if not ('id_token' in url):
self.log.error("Missing id token")
return False
# Parse query string
query_string = url.split('#')[1]
query = {x[0]: x[1] for x in [x.split("=") for x in query_string.split("&")]}
break

if (response.status != 302):
self.log.error("Not redirected, status %u" % response.status)
return False

response = await self.session.get(url, data=form, allow_redirects=False)

self.headers = dict(response.headers)

# Get final token
params = {
'tokenType': 'CONNECT'
}
payload = {
'code': query['code'],
'redirectUri': "myskoda://redirect/login/",
'verifier': code_verifier
}
response = await self.session.post(API_BASE + '/v1/authentication/exchange-authorization-code',
params=params, json=payload)
if response.status >= 400:
self.log.error("Login: Non-2xx response")
# Non 2xx response, failed
return False
self.tokens = await response.json()

# Update header with final token
self.headers['Authorization'] = 'Bearer %s' % self.tokens["accessToken"]

# Success
return True

async def refresh_tokens(self):
if not self.headers:
return False

params = {
'tokenType': 'CONNECT'
}
# Use the refresh token
payload = {
'token': self.tokens["refreshToken"]
}

response = await self.session.post(API_BASE + '/v1/authentication/refresh-token', params=params, json=payload)
if response.status >= 400:
return False
self.tokens = await response.json()

# Use the newly received access token
self.headers['Authorization'] = 'Bearer %s' % self.tokens["accessToken"]

return True

async def get_status(self):
status_url = f"{API_BASE}/v2/vehicle-status/{self.vin}/driving-range"
response = await self.session.get(status_url, headers=self.headers)

# If first attempt fails, try to refresh tokens
if response.status >= 400:
self.log.debug("Refreshing tokens")
if await self.refresh_tokens():
response = await self.session.get(status_url, headers=self.headers)

# If refreshing tokens failed, try a full reconnect
if response.status >= 400:
self.log.info("Reconnecting")
if await self.reconnect():
response = await self.session.get(status_url, headers=self.headers)
else:
self.log.error("Reconnect failed")
return {}

if response.status >= 400:
self.log.error("Get status failed")
return {}

return (await response.json())