From e4a5c99bf1c526b3c43a69685ef38889d87fe47e Mon Sep 17 00:00:00 2001 From: bhargav191098 Date: Thu, 20 Jun 2024 20:04:42 -0700 Subject: [PATCH 1/8] byte support for upload --- python/fedml/api/modules/storage.py | 66 +++++++++++++++++++---------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 33e781be0..736d5509e 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -30,13 +30,16 @@ def __init__(self, data: dict): class DataType(Enum): FILE = "file" DIRECTORY = "directory" + BYTE = "byte" INVALID = "invalid" + + # Todo (alaydshah): Store service name in metadata # Todo (alaydshah): If data already exists, don't upload again. Instead suggest to use update command # Todo (bhargav) : Discuss and remove the service variable. Maybe needed sometime later. def upload(data_path, api_key, name, description, tag_list, service, show_progress, out_progress_to_err, progress_desc, - metadata) -> FedMLResponse: + metadata, byte_data_flag=False, byte_data=None) -> FedMLResponse: api_key = authenticate(api_key) user_id, message = _get_user_id_from_api_key(api_key) @@ -44,12 +47,12 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre if user_id is None: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - data_type = _get_data_type(data_path) + data_type = _get_data_type(data_path, byte_data_flag) - if(data_type == DataType.INVALID): + if data_type == DataType.INVALID: return FedMLResponse(code=ResponseCode.FAILURE,message="Invalid data path") - if(data_type == DataType.DIRECTORY): + if data_type == DataType.DIRECTORY: to_upload_path, message = _archive_data(data_path) name = os.path.splitext(os.path.basename(to_upload_path))[0] if name is None else name file_name = name + ".zip" @@ -67,18 +70,24 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre file_name = name - if not to_upload_path: + if not to_upload_path and not byte_data_flag: return FedMLResponse(code=ResponseCode.FAILURE, message=message) #TODO(bhargav191098) - Better done on the backend. Remove and pass file_name once completed on backend. dest_path = os.path.join(user_id, file_name) - file_size = os.path.getsize(to_upload_path) + max_chunk_size = 20 * 1024 * 1024 + + if byte_data_flag: + file_size = sum(len(chunk) for chunk in get_chunks_from_byte_data(byte_data, max_chunk_size)) - file_uploaded_url, message = _upload_multipart(api_key, dest_path, to_upload_path, show_progress, + else: + file_size = os.path.getsize(to_upload_path) + + file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, to_upload_path, show_progress, out_progress_to_err, - progress_desc, metadata) + progress_desc, metadata, byte_data_flag, byte_data) - if(data_type == "dir"): + if data_type == "dir": os.remove(to_upload_path) if not file_uploaded_url: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {to_upload_path}") @@ -262,6 +271,13 @@ def get_chunks(file_path, chunk_size): break yield chunk +def get_chunks_from_byte_data(byte_data, chunk_size): + while True: + chunk = byte_data.read(chunk_size) + if not chunk: + break + yield chunk + def _get_presigned_url(api_key, request_url, file_name, part_number=None): cert_path = MLOpsConfigs.get_cert_path_with_version() @@ -287,7 +303,7 @@ def _upload_part(url,part_data,session): return response -def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session=None): +def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session=None, byte_data_flag= False): for retry_attempt in range(max_retries): try: response = _upload_part(presigned_url,chunk,session) @@ -297,11 +313,12 @@ def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session= else: raise requests.exceptions.RequestException - if(pbar is not None): - pbar.update(chunk.__sizeof__()) + if pbar is not None: + pbar.update(len(chunk)) return {'etag': response.headers['ETag'], 'partNumber': part} raise requests.exceptions.RequestException + def _process_post_response(response): if response.status_code != 200: message = (f"Failed to complete multipart upload with status code = {response.status_code}, " @@ -345,14 +362,10 @@ def _complete_multipart_upload(api_key, file_key, part_info, upload_id): return _process_post_response(complete_multipart_response) -def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_progress_to_err, - progress_desc_text, metadata): +def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive_path, show_progress, out_progress_to_err, + progress_desc_text, metadata, byte_data_flag, byte_data): request_url = ServerConstants.get_presigned_multi_part_url() - file_size = os.path.getsize(archive_path) - - max_chunk_size = 20 * 1024 * 1024 - num_chunks = _get_num_chunks(file_size, max_chunk_size) upload_id = "" @@ -379,8 +392,12 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p upload_id = data['uploadId'] presigned_urls = data['urls'] - parts = [] - chunks = get_chunks(archive_path, max_chunk_size) + if byte_data_flag: + byte_data.seek(0) + chunks = get_chunks_from_byte_data(byte_data, max_chunk_size) + else: + chunks = get_chunks(archive_path, max_chunk_size) + part_info = [] chunk_count = 0 successful_chunks = 0 @@ -396,7 +413,7 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p if show_progress: try: part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part, - pbar=pbar,session=atomic_session) + pbar=pbar,session=atomic_session, byte_data_flag = byte_data_flag) part_info.append(part_data) successful_chunks += 1 except Exception as e: @@ -474,8 +491,11 @@ def _get_storage_service(service): else: raise NotImplementedError(f"Service {service} not implemented") -def _get_data_type(data_path): - if os.path.isdir(data_path): + +def _get_data_type(data_path, byte_data_flag): + if byte_data_flag: + return DataType.BYTE + elif os.path.isdir(data_path): return DataType.DIRECTORY elif os.path.isfile(data_path): return DataType.FILE From 8661dca89fc783b357787198dada17abbda89259 Mon Sep 17 00:00:00 2001 From: bhargav191098 Date: Fri, 21 Jun 2024 17:41:28 -0700 Subject: [PATCH 2/8] simplified way to upload byte data + encrypted_url + new_backend_apis --- python/fedml/api/modules/storage.py | 218 +++++++++++------- python/fedml/api/modules/utils.py | 12 +- .../scheduler/master/server_constants.py | 6 +- .../scheduler_entry/resource_manager.py | 5 +- 4 files changed, 141 insertions(+), 100 deletions(-) diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 736d5509e..657df98e7 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -23,74 +23,40 @@ def __init__(self, data: dict): self.tags = data.get("description", None) self.createdAt = data.get("createTime", None) self.updatedAt = data.get("updateTime", None) - self.size = _get_size(data.get("fileSize",None)) + self.size = _get_size(data.get("fileSize", None)) self.tag_list = data.get("tags", None) self.download_url = data.get("fileUrl", None) + class DataType(Enum): FILE = "file" DIRECTORY = "directory" - BYTE = "byte" INVALID = "invalid" - # Todo (alaydshah): Store service name in metadata # Todo (alaydshah): If data already exists, don't upload again. Instead suggest to use update command # Todo (bhargav) : Discuss and remove the service variable. Maybe needed sometime later. def upload(data_path, api_key, name, description, tag_list, service, show_progress, out_progress_to_err, progress_desc, - metadata, byte_data_flag=False, byte_data=None) -> FedMLResponse: - api_key = authenticate(api_key) - + metadata, encrypted_api_key_flag=False, byte_data=None) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - data_type = _get_data_type(data_path, byte_data_flag) - - if data_type == DataType.INVALID: - return FedMLResponse(code=ResponseCode.FAILURE,message="Invalid data path") + if byte_data: + file_uploaded_url, message, file_size = _upload_bytes(api_key, user_id, name, show_progress, + out_progress_to_err, progress_desc, metadata, + byte_data, encrypted_api_key_flag=encrypted_api_key_flag) - if data_type == DataType.DIRECTORY: - to_upload_path, message = _archive_data(data_path) - name = os.path.splitext(os.path.basename(to_upload_path))[0] if name is None else name - file_name = name + ".zip" else: - to_upload_path = data_path - base_name = os.path.basename(to_upload_path) - file_extension = os.path.splitext(base_name)[1] - given_extension = None - if name is not None: - given_extension = os.path.splitext(name)[1] - if given_extension is None or given_extension == "": - name = name + file_extension - else: - name = base_name + file_uploaded_url, message, file_size, name = _upload_file(api_key, user_id, name, data_path, show_progress, + out_progress_to_err, progress_desc, metadata, + encrypted_api_key_flag=encrypted_api_key_flag) - file_name = name - - if not to_upload_path and not byte_data_flag: - return FedMLResponse(code=ResponseCode.FAILURE, message=message) - - #TODO(bhargav191098) - Better done on the backend. Remove and pass file_name once completed on backend. - dest_path = os.path.join(user_id, file_name) - max_chunk_size = 20 * 1024 * 1024 - - if byte_data_flag: - file_size = sum(len(chunk) for chunk in get_chunks_from_byte_data(byte_data, max_chunk_size)) - - else: - file_size = os.path.getsize(to_upload_path) - - file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, to_upload_path, show_progress, - out_progress_to_err, - progress_desc, metadata, byte_data_flag, byte_data) - - if data_type == "dir": - os.remove(to_upload_path) if not file_uploaded_url: - return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {to_upload_path}") + return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {data_path}") json_data = { "datasetName": name, @@ -101,7 +67,8 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre } try: - response = _create_dataset(api_key=api_key, json_data=json_data) + response = _create_dataset(api_key=api_key, json_data=json_data, encrypted_api_key_flag=encrypted_api_key_flag) + print("create dataset ", response) code, message, data = _get_data_from_response(message="Failed to upload data", response=response) except Exception as e: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}") @@ -112,14 +79,14 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre # Todo(alaydshah): Query service from object metadata -def download(data_name, api_key, service, dest_path, show_progress=True) -> FedMLResponse: - api_key = authenticate(api_key) +def download(data_name, api_key, service, dest_path, show_progress=True, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: return FedMLResponse(code=ResponseCode.FAILURE, message=message) - metadata_response = get_metadata(data_name, api_key) + metadata_response = get_metadata(data_name, api_key, encrypted_api_key_flag=encrypted_api_key_flag) if metadata_response.code == ResponseCode.SUCCESS: metadata = metadata_response.data if not metadata or not isinstance(metadata, StorageMetadata): @@ -129,7 +96,7 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM download_url = metadata.download_url given_extension = os.path.splitext(data_name)[1] is_file = True - if(given_extension is None or given_extension ==""): + if (given_extension is None or given_extension == ""): is_file = False if not is_file: @@ -146,7 +113,7 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM else: if not os.path.exists(dest_path): os.makedirs(dest_path) - shutil.move(path_local,dest_path) + shutil.move(path_local, dest_path) abs_dest_path = os.path.abspath(dest_path) return FedMLResponse(code=ResponseCode.SUCCESS, message=f"Successfully downloaded and unzipped data at " f"{abs_dest_path}", data=abs_dest_path) @@ -185,11 +152,12 @@ def get_user_metadata(data_name, api_key=None) -> FedMLResponse: return FedMLResponse(code=ResponseCode.SUCCESS, message=message, data=data) -def get_metadata(data_name, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def get_metadata(data_name, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) try: - response = _get_dataset_metadata(api_key=api_key, data_name=data_name) + response = _get_dataset_metadata(api_key=api_key, data_name=data_name, + encrypted_api_key_flag=encrypted_api_key_flag) code, message, data = _get_data_from_response(message="Failed to upload data", response=response) except Exception as e: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to get metadata of '{data_name}' with " @@ -206,10 +174,10 @@ def get_metadata(data_name, api_key=None) -> FedMLResponse: return FedMLResponse(code=code, message=message) -def list_objects(api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def list_objects(api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) try: - response = _list_dataset(api_key=api_key) + response = _list_dataset(api_key=api_key, encrypted_api_key_flag=encrypted_api_key_flag) except Exception as e: message = f"Failed to list stored objects for account linked with api_key {api_key} with exception {e}" logging.error(message) @@ -258,6 +226,7 @@ def delete(data_name, service, api_key=None) -> FedMLResponse: logging.error(message, data_name, service) return FedMLResponse(code=ResponseCode.FAILURE, message=message, data=False) + def _get_num_chunks(file_size, max_chunk_size): num_chunks = math.ceil(file_size / max_chunk_size) return num_chunks @@ -271,6 +240,7 @@ def get_chunks(file_path, chunk_size): break yield chunk + def get_chunks_from_byte_data(byte_data, chunk_size): while True: chunk = byte_data.read(chunk_size) @@ -279,10 +249,11 @@ def get_chunks_from_byte_data(byte_data, chunk_size): yield chunk -def _get_presigned_url(api_key, request_url, file_name, part_number=None): +def _get_presigned_url(api_key, request_url, file_name, part_number=None, encrypted_api_key_flag=False): cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) params_dict = {'fileKey': file_name} if part_number is not None: params_dict['partNumber'] = part_number @@ -298,15 +269,15 @@ def _get_presigned_url(api_key, request_url, file_name, part_number=None): return response -def _upload_part(url,part_data,session): - response = session.put(url,data=part_data,verify=True) +def _upload_part(url, part_data, session): + response = session.put(url, data=part_data, verify=True) return response -def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session=None, byte_data_flag= False): +def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20, session=None): for retry_attempt in range(max_retries): try: - response = _upload_part(presigned_url,chunk,session) + response = _upload_part(presigned_url, chunk, session) except requests.exceptions.RequestException as e: if retry_attempt < max_retries: continue @@ -339,13 +310,14 @@ def _process_post_response(response): return data_url, "Successfully uploaded the data! " -def _complete_multipart_upload(api_key, file_key, part_info, upload_id): +def _complete_multipart_upload(api_key, file_key, part_info, upload_id, encrypted_api_key_flag=False): complete_multipart_url = ServerConstants.get_complete_multipart_upload_url() body_dict = {"fileKey": file_key, 'partETags': part_info, 'uploadId': upload_id} cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is None: try: requests.session().verify = cert_path @@ -362,8 +334,9 @@ def _complete_multipart_upload(api_key, file_key, part_info, upload_id): return _process_post_response(complete_multipart_response) -def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive_path, show_progress, out_progress_to_err, - progress_desc_text, metadata, byte_data_flag, byte_data): +def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, show_progress, + out_progress_to_err, + progress_desc_text, chunks, encrypted_api_key_flag=False): request_url = ServerConstants.get_presigned_multi_part_url() num_chunks = _get_num_chunks(file_size, max_chunk_size) @@ -371,7 +344,8 @@ def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive upload_id = "" presigned_urls = [] - presigned_url_response = _get_presigned_url(api_key, request_url, file_key, num_chunks) + presigned_url_response = _get_presigned_url(api_key, request_url, file_key, num_chunks, + encrypted_api_key_flag=encrypted_api_key_flag) if presigned_url_response.status_code != 200: message = (f"Failed to get presigned URL with status code = {presigned_url_response.status_code}, " @@ -392,12 +366,6 @@ def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive upload_id = data['uploadId'] presigned_urls = data['urls'] - if byte_data_flag: - byte_data.seek(0) - chunks = get_chunks_from_byte_data(byte_data, max_chunk_size) - else: - chunks = get_chunks(archive_path, max_chunk_size) - part_info = [] chunk_count = 0 successful_chunks = 0 @@ -413,7 +381,7 @@ def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive if show_progress: try: part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part, - pbar=pbar,session=atomic_session, byte_data_flag = byte_data_flag) + pbar=pbar, session=atomic_session) part_info.append(part_data) successful_chunks += 1 except Exception as e: @@ -422,18 +390,87 @@ def _upload_multipart(api_key: str, file_key, file_size, max_chunk_size, archive else: try: part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part, - pbar=pbar,session=atomic_session) + pbar=pbar, session=atomic_session) part_info.append(part_data) successful_chunks += 1 except Exception as e: return None, "unsuccessful" if successful_chunks == chunk_count: - return _complete_multipart_upload(api_key, file_key, part_info, upload_id) + return _complete_multipart_upload(api_key, file_key, part_info, upload_id, + encrypted_api_key_flag=encrypted_api_key_flag) else: return None, "Unsuccessful!" +def _upload_bytes(api_key, user_id, file_name, + show_progress, out_progress_to_err, progress_desc, metadata, byte_data, encrypted_api_key_flag=False): + if file_name is None: + return None, "name cannot be None" + + dest_path = os.path.join(user_id, file_name) + max_chunk_size = 20 * 1024 * 1024 + + file_size = sum(len(chunk) for chunk in get_chunks_from_byte_data(byte_data, max_chunk_size)) + + byte_data.seek(0) + chunks = get_chunks_from_byte_data(byte_data, max_chunk_size) + + file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, + show_progress, + out_progress_to_err, + progress_desc, chunks, encrypted_api_key_flag=encrypted_api_key_flag) + + return file_uploaded_url, message, file_size + + +def _upload_file(api_key, user_id, name, data_path, show_progress, out_progress_to_err, progress_desc, + metadata, encrypted_api_key_flag=False): + data_type = _get_data_type(data_path) + message = "" + + if data_type == DataType.INVALID: + return FedMLResponse(code=ResponseCode.FAILURE, message="Invalid data path") + + if data_type == DataType.DIRECTORY: + to_upload_path, message = _archive_data(data_path) + name = os.path.splitext(os.path.basename(to_upload_path))[0] if name is None else name + file_name = name + ".zip" + else: + to_upload_path = data_path + base_name = os.path.basename(to_upload_path) + file_extension = os.path.splitext(base_name)[1] + if name is not None: + given_extension = os.path.splitext(name)[1] + if given_extension is None or given_extension == "": + name = name + file_extension + else: + name = base_name + + file_name = name + + if not to_upload_path: + return FedMLResponse(code=ResponseCode.FAILURE, message=message) + + # TODO(bhargav191098) - Better done on the backend. Remove and pass file_name once completed on backend. + dest_path = os.path.join(user_id, file_name) + max_chunk_size = 20 * 1024 * 1024 + + file_size = os.path.getsize(to_upload_path) + + chunks = get_chunks(to_upload_path, max_chunk_size) + + file_uploaded_url, message = _upload_multipart(api_key, dest_path, file_size, max_chunk_size, + show_progress, + out_progress_to_err, + progress_desc, chunks, encrypted_api_key_flag=encrypted_api_key_flag) + + if data_type == DataType.DIRECTORY: + os.remove(to_upload_path) + + return file_uploaded_url, message, file_size, name + + def _download_using_presigned_url(url, fname, chunk_size=1024 * 1024, show_progress=True): download_response = requests.get(url, verify=True, stream=True) if download_response.status_code == 200: @@ -456,6 +493,7 @@ def _download_using_presigned_url(url, fname, chunk_size=1024 * 1024, show_progr return True return False + def _get_user_id_from_api_key(api_key: str) -> (str, str): user_url = ServerConstants.get_user_url() json_data = { @@ -492,10 +530,8 @@ def _get_storage_service(service): raise NotImplementedError(f"Service {service} not implemented") -def _get_data_type(data_path, byte_data_flag): - if byte_data_flag: - return DataType.BYTE - elif os.path.isdir(data_path): +def _get_data_type(data_path): + if os.path.isdir(data_path): return DataType.DIRECTORY elif os.path.isfile(data_path): return DataType.FILE @@ -517,11 +553,12 @@ def _archive_data(data_path: str) -> (str, str): return None, f"Error archiving data: {e}" -def _create_dataset(api_key: str, json_data: dict) -> requests.Response: +def _create_dataset(api_key: str, json_data: dict, encrypted_api_key_flag=False) -> requests.Response: dataset_url = ServerConstants.get_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path @@ -540,11 +577,12 @@ def _create_dataset(api_key: str, json_data: dict) -> requests.Response: return response -def _list_dataset(api_key: str) -> requests.Response: +def _list_dataset(api_key: str, encrypted_api_key_flag=False) -> requests.Response: list_dataset_url = ServerConstants.list_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is None: try: requests.session().verify = cert_path @@ -557,11 +595,12 @@ def _list_dataset(api_key: str) -> requests.Response: return response -def _get_dataset_metadata(api_key: str, data_name: str) -> requests.Response: +def _get_dataset_metadata(api_key: str, data_name: str, encrypted_api_key_flag=False) -> requests.Response: dataset_metadata_url = ServerConstants.get_dataset_metadata_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path @@ -624,19 +663,20 @@ def _get_data_from_response(message: str, response: requests.Response) -> (Respo return ResponseCode.SUCCESS, "Successfully parsed data from response", data -def _get_size(size_in_bytes:str)->str: + +def _get_size(size_in_bytes: str) -> str: size_str = "" - if(size_in_bytes): + if (size_in_bytes): size = int(size_in_bytes) size_in_gb = size / (1024 * 1024 * 1024) size_in_mb = size / (1024 * 1024) size_in_kb = size / 1024 - if(size_in_gb >= 1): + if (size_in_gb >= 1): size_str = f"{size_in_gb:.2f} GB" - elif(size_in_mb >= 1): + elif (size_in_mb >= 1): size_str = f"{size_in_mb:.2f} MB" - elif(size_in_kb >= 1): + elif (size_in_kb >= 1): size_str = f"{size_in_kb:.2f} KB" else: size_str = f"{size} B" - return size_str \ No newline at end of file + return size_str diff --git a/python/fedml/api/modules/utils.py b/python/fedml/api/modules/utils.py index 76801ffe8..d1d555d0f 100644 --- a/python/fedml/api/modules/utils.py +++ b/python/fedml/api/modules/utils.py @@ -9,15 +9,15 @@ FEDML_MLOPS_BUILD_PRE_IGNORE_LIST = 'dist-packages,client-package.zip,server-package.zip,__pycache__,*.pyc,*.git' -def fedml_login(api_key): - api_key_is_valid, api_key = _check_api_key(api_key=api_key) +def fedml_login(api_key, encrypted_api_key_flag=False): + api_key_is_valid, api_key = _check_api_key(api_key=api_key, encrypted_api_key_flag=encrypted_api_key_flag) if api_key_is_valid: return 0, api_key return -1, api_key -def _check_api_key(api_key=None): +def _check_api_key(api_key=None, encrypted_api_key_flag=False): if api_key is None or api_key == "": saved_api_key = get_api_key() if saved_api_key is None or saved_api_key == "": @@ -25,7 +25,7 @@ def _check_api_key(api_key=None): else: api_key = saved_api_key - is_valid_heartbeat = FedMLResourceManager.get_instance().check_heartbeat(api_key) + is_valid_heartbeat = FedMLResourceManager.get_instance().check_heartbeat(api_key, encrypted_api_key_flag) if not is_valid_heartbeat: return False, api_key else: @@ -33,9 +33,9 @@ def _check_api_key(api_key=None): return True, api_key -def authenticate(api_key): +def authenticate(api_key, encrypted_api_key_flag=False): - error_code, api_key = fedml_login(api_key) + error_code, api_key = fedml_login(api_key, encrypted_api_key_flag) # Exit if not able to authenticate successfully if error_code: diff --git a/python/fedml/computing/scheduler/master/server_constants.py b/python/fedml/computing/scheduler/master/server_constants.py index ebd8b2aef..2c59fbedf 100644 --- a/python/fedml/computing/scheduler/master/server_constants.py +++ b/python/fedml/computing/scheduler/master/server_constants.py @@ -251,7 +251,7 @@ def get_user_url(): @staticmethod def get_dataset_url(): - create_dataset_url = "{}/fedmlOpsServer/api/v1/cli/dataset".format( + create_dataset_url = "{}/system/api/v1/cli/storage".format( ServerConstants.get_mlops_url()) return create_dataset_url @@ -271,13 +271,13 @@ def get_complete_multipart_upload_url(): @staticmethod def list_dataset_url(): - list_dataset_url = "{}/fedmlOpsServer/api/v1/cli/dataset/list".format( + list_dataset_url = "{}/system/api/v1/cli/storage/list".format( ServerConstants.get_mlops_url()) return list_dataset_url @staticmethod def get_dataset_metadata_url(): - get_dataset_metadata_url = "{}/fedmlOpsServer/api/v1/cli/dataset/meta".format( + get_dataset_metadata_url = "{}/system/api/v1/cli/storage/meta".format( ServerConstants.get_mlops_url()) return get_dataset_metadata_url diff --git a/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py b/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py index 615b77041..3d910da3a 100644 --- a/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py +++ b/python/fedml/computing/scheduler/scheduler_entry/resource_manager.py @@ -15,9 +15,10 @@ def __init__(self): def get_instance(): return FedMLResourceManager() - def check_heartbeat(self, api_key): + def check_heartbeat(self, api_key, encrypted_api_key_flag=False): heartbeat_url = ServerConstants.get_heartbeat_url() - heartbeat_api_headers = {'Content-Type': 'application/json', 'Connection': 'close'} + heartbeat_api_headers = {'Content-Type': 'application/json', 'Connection': 'close', + 'Encrypted': str(encrypted_api_key_flag)} heartbeat_json = { "apiKey": api_key } From c00ac6a2ecef124bfbfb1967569e1ec3b9984529 Mon Sep 17 00:00:00 2001 From: bhargav191098 Date: Fri, 21 Jun 2024 17:49:07 -0700 Subject: [PATCH 3/8] some storage function missed flag - all updated --- python/fedml/api/modules/storage.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 657df98e7..5bf64a441 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -132,8 +132,8 @@ def download(data_name, api_key, service, dest_path, show_progress=True, encrypt return FedMLResponse(code=ResponseCode.FAILURE, message=error_message) -def get_user_metadata(data_name, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def get_user_metadata(data_name, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: @@ -195,8 +195,8 @@ def list_objects(api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: # Todo(alaydshah): Query service from object metadata. Make the transaction atomic or rollback if partially failed -def delete(data_name, service, api_key=None) -> FedMLResponse: - api_key = authenticate(api_key) +def delete(data_name, service, api_key=None, encrypted_api_key_flag=False) -> FedMLResponse: + api_key = authenticate(api_key, encrypted_api_key_flag=encrypted_api_key_flag) user_id, message = _get_user_id_from_api_key(api_key) if user_id is None: @@ -209,7 +209,8 @@ def delete(data_name, service, api_key=None) -> FedMLResponse: if result: logging.info(f"Successfully deleted object from storage service.") try: - response = _delete_dataset(api_key=api_key, data_name=data_name) + response = _delete_dataset(api_key=api_key, data_name=data_name, + encrypted_api_key_flag=encrypted_api_key_flag) code, message, data = _get_data_from_response(message="Failed to delete data", response=response) except Exception as e: message = (f"Deleted object from storage service but failed to delete object metadata from Nexus Backend " @@ -622,11 +623,12 @@ def _get_dataset_metadata(api_key: str, data_name: str, encrypted_api_key_flag=F return response -def _delete_dataset(api_key: str, data_name: str) -> requests.Response: +def _delete_dataset(api_key: str, data_name: str, encrypted_api_key_flag=False) -> requests.Response: dataset_url = ServerConstants.get_dataset_url() cert_path = MLOpsConfigs.get_cert_path_with_version() headers = ServerConstants.API_HEADERS headers["Authorization"] = f"Bearer {api_key}" + headers["Encrypted"] = str(encrypted_api_key_flag) if cert_path is not None: try: requests.session().verify = cert_path From 0af69f45484dbcd91784de3a87350cbdb444b51d Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Sun, 23 Jun 2024 12:16:56 -0700 Subject: [PATCH 4/8] Update decorators, add timeit --- .../scheduler/master/base_master_job_runner.py | 2 +- .../fedml/utils/{debugging.py => decorators.py} | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) rename python/fedml/utils/{debugging.py => decorators.py} (74%) diff --git a/python/fedml/computing/scheduler/master/base_master_job_runner.py b/python/fedml/computing/scheduler/master/base_master_job_runner.py index 9ebab258b..7dbfb225f 100755 --- a/python/fedml/computing/scheduler/master/base_master_job_runner.py +++ b/python/fedml/computing/scheduler/master/base_master_job_runner.py @@ -16,7 +16,7 @@ from ....core.mlops.mlops_utils import MLOpsUtils from ..scheduler_core.log_manager import LogsManager from ..scheduler_core.metrics_manager import MetricsManager -from fedml.utils.debugging import debug +from fedml.utils.decorators import debug from ..scheduler_core.status_center import JobStatus from ..scheduler_core.compute_cache_manager import ComputeCacheManager from multiprocessing import Process, Queue diff --git a/python/fedml/utils/debugging.py b/python/fedml/utils/decorators.py similarity index 74% rename from python/fedml/utils/debugging.py rename to python/fedml/utils/decorators.py index 9c43290c9..8454818e2 100644 --- a/python/fedml/utils/debugging.py +++ b/python/fedml/utils/decorators.py @@ -1,9 +1,10 @@ from typing import Callable import os +import time import functools -def debug(_func: Callable=None, *, output_file="output.txt"): +def debug(_func: Callable = None, *, output_file="output.txt"): def decorator(func: Callable): @functools.wraps(func) @@ -36,3 +37,16 @@ def wrapper(*args, **kwargs): return decorator return decorator(_func) + +def timeit(func: Callable): + """Print the runtime of the decorated function""" + functools.wraps(func) + + def wrapper(*args, **kwargs): + start = time.perf_counter() + func(*args, **kwargs) + end = time.perf_counter() + run_time = end - start + print(f"Finished {func.__name__!r} in {run_time:.4f} seconds") + + return wrapper From 809c52d0f932b0f1a6fbe1bb1d7f7531bdac3738 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Sun, 23 Jun 2024 13:00:53 -0700 Subject: [PATCH 5/8] Bug fix --- python/fedml/utils/decorators.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/fedml/utils/decorators.py b/python/fedml/utils/decorators.py index 8454818e2..db73ff7d3 100644 --- a/python/fedml/utils/decorators.py +++ b/python/fedml/utils/decorators.py @@ -38,15 +38,17 @@ def wrapper(*args, **kwargs): return decorator(_func) + def timeit(func: Callable): """Print the runtime of the decorated function""" functools.wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() - func(*args, **kwargs) + value = func(*args, **kwargs) end = time.perf_counter() run_time = end - start print(f"Finished {func.__name__!r} in {run_time:.4f} seconds") + return value return wrapper From 917c46980d81b735a03859c7ea15f48e46c223e4 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Sun, 23 Jun 2024 17:52:49 -0700 Subject: [PATCH 6/8] Add fedml response to init --- python/fedml/api/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/fedml/api/__init__.py b/python/fedml/api/__init__.py index f753e4255..a1a3ef5fb 100755 --- a/python/fedml/api/__init__.py +++ b/python/fedml/api/__init__.py @@ -27,6 +27,7 @@ from fedml.computing.scheduler.model_scheduler.device_server_constants import ServerConstants from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants +from .fedml_response import FedMLResponse def fedml_login(api_key: str = None): """ From c8d90fd1e043d360ca2af6c5f3c162d33fdbd3f5 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Sun, 23 Jun 2024 17:52:49 -0700 Subject: [PATCH 7/8] Add fedml response to init --- python/fedml/api/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/fedml/api/__init__.py b/python/fedml/api/__init__.py index f753e4255..cdce1058b 100755 --- a/python/fedml/api/__init__.py +++ b/python/fedml/api/__init__.py @@ -27,6 +27,7 @@ from fedml.computing.scheduler.model_scheduler.device_server_constants import ServerConstants from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants +from .fedml_response import ResponseCode def fedml_login(api_key: str = None): """ From e536d9b796449bdbeeea3d0fbcc44b3ec072252c Mon Sep 17 00:00:00 2001 From: alaydshah Date: Mon, 24 Jun 2024 05:15:14 +0000 Subject: [PATCH 8/8] Storage hotfixes --- python/fedml/api/__init__.py | 6 ++++-- python/fedml/api/modules/storage.py | 5 ++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/python/fedml/api/__init__.py b/python/fedml/api/__init__.py index cdce1058b..052928774 100755 --- a/python/fedml/api/__init__.py +++ b/python/fedml/api/__init__.py @@ -13,6 +13,7 @@ print(f"run status {run_log_result.run_status}, total log nums {log_result.total_log_lines}, " f"total log pages {log_result.total_log_pages}, log list {log_result.log_line_list}") """ +from io import BytesIO from typing import List, Tuple from fedml.api.constants import RunStatus @@ -183,10 +184,11 @@ def cluster_killall(api_key=None) -> bool: def upload(data_path, api_key=None, tag_list=[], service="R2", name=None, description=None, metadata=None, show_progress=False, - out_progress_to_err=True, progress_desc=None) -> FedMLResponse: + out_progress_to_err=True, progress_desc=None, byte_data: BytesIO = None, encrypted_api_key_flag=False) -> FedMLResponse: return storage.upload(data_path=data_path, api_key=api_key, name=name, description=description, tag_list =tag_list, service=service, progress_desc=progress_desc, show_progress=show_progress, - out_progress_to_err=out_progress_to_err, metadata=metadata) + out_progress_to_err=out_progress_to_err, metadata=metadata, byte_data=byte_data, + encrypted_api_key_flag=encrypted_api_key_flag) def get_storage_user_defined_metadata(data_name, api_key=None) -> FedMLResponse: return storage.get_user_metadata(data_name=data_name, api_key=api_key) diff --git a/python/fedml/api/modules/storage.py b/python/fedml/api/modules/storage.py index 5bf64a441..0ff64c00e 100644 --- a/python/fedml/api/modules/storage.py +++ b/python/fedml/api/modules/storage.py @@ -68,7 +68,6 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre try: response = _create_dataset(api_key=api_key, json_data=json_data, encrypted_api_key_flag=encrypted_api_key_flag) - print("create dataset ", response) code, message, data = _get_data_from_response(message="Failed to upload data", response=response) except Exception as e: return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to create dataset: {e}") @@ -96,7 +95,7 @@ def download(data_name, api_key, service, dest_path, show_progress=True, encrypt download_url = metadata.download_url given_extension = os.path.splitext(data_name)[1] is_file = True - if (given_extension is None or given_extension == ""): + if not given_extension: is_file = False if not is_file: @@ -411,7 +410,7 @@ def _upload_bytes(api_key, user_id, file_name, dest_path = os.path.join(user_id, file_name) max_chunk_size = 20 * 1024 * 1024 - + byte_data.seek(0) file_size = sum(len(chunk) for chunk in get_chunks_from_byte_data(byte_data, max_chunk_size)) byte_data.seek(0)