Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 72 additions & 9 deletions carelink_carepartner_api_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ def do_captcha(url, redirect_url):
if "location" in request.response.headers:
location = request.response.headers["location"]
if redirect_url in location:
code = re.search(r"code=(.*)&", location).group(1)
state = re.search(r"state=(.*)", location).group(1)
code = re.search(r"code=(.*)", location).group(1)
state = None
if "state=" in location:
state = re.search(r"state=(.*)", location).group(1)
driver.quit()
return (code, state)
sleep(0.1)
Expand All @@ -125,27 +127,38 @@ def resolve_endpoint_config(discovery_url, is_us_region=False):
discover_resp = json.loads(requests.get(discovery_url).text)
sso_url = None

is_auth0 = False

for c in discover_resp["CP"]:
if c['region'].lower() == "us" and is_us_region:
sso_url = c['SSOConfiguration']
key = c['UseSSOConfiguration']
sso_url = c[key]
if "Auth0" in key:
is_auth0 = True
elif c['region'].lower() == "eu" and not is_us_region:
sso_url = c['SSOConfiguration']
key = c['UseSSOConfiguration']
sso_url = c[key]
if "Auth0" in key:
is_auth0 = True

if sso_url is None:
raise Exception("Could not get SSO config url")

sso_config = json.loads(requests.get(sso_url).text)
api_base_url = f"https://{sso_config['server']['hostname']}:{sso_config['server']['port']}/{sso_config['server']['prefix']}"
return sso_config, api_base_url
if api_base_url.endswith('/'):
api_base_url = api_base_url[:-1]
return sso_config, api_base_url, is_auth0

def write_datafile(obj, filename):
print("wrote data file")
with open(filename, 'w') as f:
json.dump(obj, f, indent=4)

def do_login(endpoint_config):
sso_config, api_base_url = endpoint_config
# step 1 initialize
def do_login_non_auth0(endpoint_config):
sso_config, api_base_url, is_auth0 = endpoint_config

# step 1 initialize
data = {
'client_id': sso_config['oauth']['client']['client_ids'][0]['client_id'],
"nonce" : random_uuid()
Expand Down Expand Up @@ -241,6 +254,56 @@ def do_login(endpoint_config):
write_datafile(token_data, logindata_file)
return token_data

def do_login_auth0(endpoint_config):
sso_config, api_base_url, is_auth0 = endpoint_config
auth_params = {
'client_id': sso_config['client']['client_id'],
'response_type' : 'code',
'scope': sso_config["client"]["scope"],
'redirect_uri': sso_config["client"]['redirect_uri'],
'audience': sso_config["client"]["audience"]
}
authorize_url = (
api_base_url + sso_config["system_endpoints"]["authorization_endpoint_path"]
)
captcha_url = f"{authorize_url}?{'&'.join(f'{key}={value}' for key, value in auth_params.items())}"
captcha_code, captcha_sso_state = do_captcha(
captcha_url, sso_config["client"]["redirect_uri"]
)

token_req_url = api_base_url + sso_config["system_endpoints"]["token_endpoint_path"]
token_req_data = {
"grant_type": "authorization_code",
"client_id": sso_config["client"]["client_id"],
"code": captcha_code,
"redirect_uri": sso_config["client"]["redirect_uri"],
}
token_req = requests.post(token_req_url, data=token_req_data)
if token_req.status_code != 200:
print(f"\n\n{curlify.to_curl(token_req.request)}")
print(token_req.text)
raise Exception("Could not get token data")

token_data = json.loads(token_req.text)
print(f"got token data from server")

print(token_data)

token_data["client_id"] = token_req_data["client_id"]
del token_data["expires_in"]
del token_data["token_type"]

write_datafile(token_data, logindata_file)
return token_data

def do_login(endpoint_config):
sso_config, api_base_url, is_auth0 = endpoint_config

if is_auth0:
return do_login_auth0(endpoint_config)
else:
return do_login_non_auth0(endpoint_config)

def read_data_file(file):
token_data = None
if os.path.isfile(file):
Expand All @@ -260,7 +323,7 @@ def read_data_file(file):
# config
is_debug = False
logindata_file = 'logindata.json'
discovery_url = 'https://clcloud.minimed.eu/connect/carepartner/v11/discover/android/3.2'
discovery_url = 'https://clcloud.minimed.eu/connect/carepartner/v13/discover/android/3.6'
rsa_keysize = 2048

def main(is_us_region):
Expand Down
28 changes: 18 additions & 10 deletions carelink_client2.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

# Constants
DEFAULT_FILENAME="logindata.json"
CARELINK_CONFIG_URL = "https://clcloud.minimed.eu/connect/carepartner/v11/discover/android/3.3"
CARELINK_CONFIG_URL = "https://clcloud.minimed.eu/connect/carepartner/v13/discover/android/3.6"
AUTH_ERROR_CODES = [401,403]
COMMON_HEADERS = {
"Accept": "application/json",
Expand Down Expand Up @@ -114,7 +114,7 @@ def _read_token_file(self, filename):
log.error("ERROR: failed parsing token file %s" % filename)

if token_data is not None:
required_fields = ["access_token", "refresh_token", "scope", "client_id", "client_secret", "mag-identifier"]
required_fields = ["access_token", "refresh_token", "scope", "client_id"]
for f in required_fields:
if f not in token_data:
log.error("ERROR: field %s is missing from token file" % f)
Expand Down Expand Up @@ -158,12 +158,16 @@ def _get_config(self, discovery_url, country):
if config is None:
raise Exception("ERROR: failed to get config base urls for region %s" % region)

resp = requests.get(config["SSOConfiguration"])
sso_configuration_key = config["UseSSOConfiguration"]
resp = requests.get(config[sso_configuration_key])
log.debug(" status: %d" % resp.status_code)
sso_config = resp.json()
sso_base_url = "https://%s:%d/%s" % (sso_config["server"]["hostname"],
sso_config["server"]["port"],
sso_config["server"]["prefix"])
if sso_base_url.endswith('/'):
sso_base_url = sso_base_url[:-1] # remove trailing slash if prefix is empty

token_url = sso_base_url + sso_config["system_endpoints"]["token_endpoint_path"]
c["token_url"] = token_url
return config
Expand All @@ -175,7 +179,8 @@ def _get_user(self, config, token_data):
log.info("_get_user()")
url = config["baseUrlCareLink"] + "/users/me"
headers = COMMON_HEADERS
headers["mag-identifier"] = token_data["mag-identifier"]
if "mag-identifier" in token_data:
headers["mag-identifier"] = token_data["mag-identifier"]
headers["Authorization"] = "Bearer " + token_data["access_token"]
self.__last_api_status = None
resp = requests.get(url=url,headers=headers)
Expand All @@ -194,7 +199,8 @@ def _get_patient(self, config, token_data):
log.info("_get_patient()")
url = config["baseUrlCareLink"] + "/links/patients"
headers = COMMON_HEADERS
headers["mag-identifier"] = token_data["mag-identifier"]
if "mag-identifier" in token_data:
headers["mag-identifier"] = token_data["mag-identifier"]
headers["Authorization"] = "Bearer " + token_data["access_token"]
self.__last_api_status = None
resp = requests.get(url=url,headers=headers)
Expand All @@ -213,7 +219,8 @@ def _get_data(self, config, token_data, username, role, patientid):
log.info("_get_data()")
url = config["baseUrlCumulus"] + "/display/message"
headers = COMMON_HEADERS
headers["mag-identifier"] = token_data["mag-identifier"]
if "mag-identifier" in token_data:
headers["mag-identifier"] = token_data["mag-identifier"]
headers["Authorization"] = "Bearer " + token_data["access_token"]
data = {}
data["username"] = username
Expand Down Expand Up @@ -245,12 +252,13 @@ def _do_refresh(self, config, token_data):
data = {
"refresh_token": token_data["refresh_token"],
"client_id": token_data["client_id"],
"client_secret": token_data["client_secret"],
"grant_type": "refresh_token"
}
headers = {
"mag-identifier": token_data["mag-identifier"]
}
if "client_secret" in token_data:
data["client_secret"] = token_data["client_secret"]
headers = {}
if "mag-identifier" in token_data:
headers["mag-identifier"] = token_data["mag-identifier"]
resp = requests.post(url=token_url, headers=headers, data=data)
log.debug(" status: %d" % resp.status_code)
if resp.status_code != 200:
Expand Down