diff --git a/carelink_carepartner_api_login.py b/carelink_carepartner_api_login.py index 42f3212..938b440 100644 --- a/carelink_carepartner_api_login.py +++ b/carelink_carepartner_api_login.py @@ -115,8 +115,10 @@ def do_captcha(url, redirect_url): if "location" in request.response.headers: location = request.response.headers["location"] if redirect_url in location: - code = re.search(r"code=(.*)&", location).group(1) - state = re.search(r"state=(.*)", location).group(1) + code = re.search(r"code=(.*)", location).group(1) + state = None + if "state=" in location: + state = re.search(r"state=(.*)", location).group(1) driver.quit() return (code, state) sleep(0.1) @@ -125,27 +127,38 @@ def resolve_endpoint_config(discovery_url, is_us_region=False): discover_resp = json.loads(requests.get(discovery_url).text) sso_url = None + is_auth0 = False + for c in discover_resp["CP"]: if c['region'].lower() == "us" and is_us_region: - sso_url = c['SSOConfiguration'] + key = c['UseSSOConfiguration'] + sso_url = c[key] + if "Auth0" in key: + is_auth0 = True elif c['region'].lower() == "eu" and not is_us_region: - sso_url = c['SSOConfiguration'] + key = c['UseSSOConfiguration'] + sso_url = c[key] + if "Auth0" in key: + is_auth0 = True if sso_url is None: raise Exception("Could not get SSO config url") sso_config = json.loads(requests.get(sso_url).text) api_base_url = f"https://{sso_config['server']['hostname']}:{sso_config['server']['port']}/{sso_config['server']['prefix']}" - return sso_config, api_base_url + if api_base_url.endswith('/'): + api_base_url = api_base_url[:-1] + return sso_config, api_base_url, is_auth0 def write_datafile(obj, filename): print("wrote data file") with open(filename, 'w') as f: json.dump(obj, f, indent=4) -def do_login(endpoint_config): - sso_config, api_base_url = endpoint_config - # step 1 initialize +def do_login_non_auth0(endpoint_config): + sso_config, api_base_url, is_auth0 = endpoint_config + + # step 1 initialize data = { 'client_id': sso_config['oauth']['client']['client_ids'][0]['client_id'], "nonce" : random_uuid() @@ -241,6 +254,56 @@ def do_login(endpoint_config): write_datafile(token_data, logindata_file) return token_data +def do_login_auth0(endpoint_config): + sso_config, api_base_url, is_auth0 = endpoint_config + auth_params = { + 'client_id': sso_config['client']['client_id'], + 'response_type' : 'code', + 'scope': sso_config["client"]["scope"], + 'redirect_uri': sso_config["client"]['redirect_uri'], + 'audience': sso_config["client"]["audience"] + } + authorize_url = ( + api_base_url + sso_config["system_endpoints"]["authorization_endpoint_path"] + ) + captcha_url = f"{authorize_url}?{'&'.join(f'{key}={value}' for key, value in auth_params.items())}" + captcha_code, captcha_sso_state = do_captcha( + captcha_url, sso_config["client"]["redirect_uri"] + ) + + token_req_url = api_base_url + sso_config["system_endpoints"]["token_endpoint_path"] + token_req_data = { + "grant_type": "authorization_code", + "client_id": sso_config["client"]["client_id"], + "code": captcha_code, + "redirect_uri": sso_config["client"]["redirect_uri"], + } + token_req = requests.post(token_req_url, data=token_req_data) + if token_req.status_code != 200: + print(f"\n\n{curlify.to_curl(token_req.request)}") + print(token_req.text) + raise Exception("Could not get token data") + + token_data = json.loads(token_req.text) + print(f"got token data from server") + + print(token_data) + + token_data["client_id"] = token_req_data["client_id"] + del token_data["expires_in"] + del token_data["token_type"] + + write_datafile(token_data, logindata_file) + return token_data + +def do_login(endpoint_config): + sso_config, api_base_url, is_auth0 = endpoint_config + + if is_auth0: + return do_login_auth0(endpoint_config) + else: + return do_login_non_auth0(endpoint_config) + def read_data_file(file): token_data = None if os.path.isfile(file): @@ -260,7 +323,7 @@ def read_data_file(file): # config is_debug = False logindata_file = 'logindata.json' -discovery_url = 'https://clcloud.minimed.eu/connect/carepartner/v11/discover/android/3.2' +discovery_url = 'https://clcloud.minimed.eu/connect/carepartner/v13/discover/android/3.6' rsa_keysize = 2048 def main(is_us_region): diff --git a/carelink_client2.py b/carelink_client2.py index b2ea543..988d760 100644 --- a/carelink_client2.py +++ b/carelink_client2.py @@ -58,7 +58,7 @@ # Constants DEFAULT_FILENAME="logindata.json" -CARELINK_CONFIG_URL = "https://clcloud.minimed.eu/connect/carepartner/v11/discover/android/3.3" +CARELINK_CONFIG_URL = "https://clcloud.minimed.eu/connect/carepartner/v13/discover/android/3.6" AUTH_ERROR_CODES = [401,403] COMMON_HEADERS = { "Accept": "application/json", @@ -114,7 +114,7 @@ def _read_token_file(self, filename): log.error("ERROR: failed parsing token file %s" % filename) if token_data is not None: - required_fields = ["access_token", "refresh_token", "scope", "client_id", "client_secret", "mag-identifier"] + required_fields = ["access_token", "refresh_token", "scope", "client_id"] for f in required_fields: if f not in token_data: log.error("ERROR: field %s is missing from token file" % f) @@ -158,12 +158,16 @@ def _get_config(self, discovery_url, country): if config is None: raise Exception("ERROR: failed to get config base urls for region %s" % region) - resp = requests.get(config["SSOConfiguration"]) + sso_configuration_key = config["UseSSOConfiguration"] + resp = requests.get(config[sso_configuration_key]) log.debug(" status: %d" % resp.status_code) sso_config = resp.json() sso_base_url = "https://%s:%d/%s" % (sso_config["server"]["hostname"], sso_config["server"]["port"], sso_config["server"]["prefix"]) + if sso_base_url.endswith('/'): + sso_base_url = sso_base_url[:-1] # remove trailing slash if prefix is empty + token_url = sso_base_url + sso_config["system_endpoints"]["token_endpoint_path"] c["token_url"] = token_url return config @@ -175,7 +179,8 @@ def _get_user(self, config, token_data): log.info("_get_user()") url = config["baseUrlCareLink"] + "/users/me" headers = COMMON_HEADERS - headers["mag-identifier"] = token_data["mag-identifier"] + if "mag-identifier" in token_data: + headers["mag-identifier"] = token_data["mag-identifier"] headers["Authorization"] = "Bearer " + token_data["access_token"] self.__last_api_status = None resp = requests.get(url=url,headers=headers) @@ -194,7 +199,8 @@ def _get_patient(self, config, token_data): log.info("_get_patient()") url = config["baseUrlCareLink"] + "/links/patients" headers = COMMON_HEADERS - headers["mag-identifier"] = token_data["mag-identifier"] + if "mag-identifier" in token_data: + headers["mag-identifier"] = token_data["mag-identifier"] headers["Authorization"] = "Bearer " + token_data["access_token"] self.__last_api_status = None resp = requests.get(url=url,headers=headers) @@ -213,7 +219,8 @@ def _get_data(self, config, token_data, username, role, patientid): log.info("_get_data()") url = config["baseUrlCumulus"] + "/display/message" headers = COMMON_HEADERS - headers["mag-identifier"] = token_data["mag-identifier"] + if "mag-identifier" in token_data: + headers["mag-identifier"] = token_data["mag-identifier"] headers["Authorization"] = "Bearer " + token_data["access_token"] data = {} data["username"] = username @@ -245,12 +252,13 @@ def _do_refresh(self, config, token_data): data = { "refresh_token": token_data["refresh_token"], "client_id": token_data["client_id"], - "client_secret": token_data["client_secret"], "grant_type": "refresh_token" } - headers = { - "mag-identifier": token_data["mag-identifier"] - } + if "client_secret" in token_data: + data["client_secret"] = token_data["client_secret"] + headers = {} + if "mag-identifier" in token_data: + headers["mag-identifier"] = token_data["mag-identifier"] resp = requests.post(url=token_url, headers=headers, data=data) log.debug(" status: %d" % resp.status_code) if resp.status_code != 200: