From df97eb7ed13ad1700544cebacd2c5ed885267980 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Wed, 3 Sep 2025 13:35:32 +0100 Subject: [PATCH 1/5] Update logger, exeception --- plugin/cloudsmith_repository.py | 266 ++++++++++++++------------------ 1 file changed, 116 insertions(+), 150 deletions(-) diff --git a/plugin/cloudsmith_repository.py b/plugin/cloudsmith_repository.py index 6ebd2a8..d99110a 100644 --- a/plugin/cloudsmith_repository.py +++ b/plugin/cloudsmith_repository.py @@ -59,18 +59,16 @@ def __init__(self, artifact_uri, tracking_uri=None): self.api_key = os.getenv("CLOUDSMITH_API_KEY") # Ensure logger outputs when debug is enabled - if self.debug_mode: - if not _logger.handlers: - handler = logging.StreamHandler() - handler.setFormatter( - logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) + if not _logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) - _logger.addHandler(handler) - _logger.setLevel(logging.INFO) - _logger.propagate = True - _logger.info("Cloudsmith: Initializing with URI: %s", artifact_uri) + ) + _logger.addHandler(handler) + _logger.setLevel(logging.DEBUG if self.debug_mode else logging.INFO) + _logger.info("Cloudsmith: Initializing with URI: %s", artifact_uri) if not self.api_key: raise ValueError("CLOUDSMITH_API_KEY environment variable must be set") @@ -78,13 +76,12 @@ def __init__(self, artifact_uri, tracking_uri=None): # Parse the URI self.owner, self.repository, self.base_path = self._parse_uri(artifact_uri) - if self.debug_mode: - _logger.info( - "Cloudsmith: owner=%s, repository=%s, base_path=%s", - self.owner, - self.repository, - self.base_path, - ) + _logger.debug( + "Cloudsmith: owner=%s, repository=%s, base_path=%s", + self.owner, + self.repository, + self.base_path, + ) @staticmethod def _parse_uri(uri: str) -> tuple: @@ -191,11 +188,9 @@ def _paginate_packages(self, query: Optional[str]) -> list: if attempt: params["query"] = attempt try: - if self.debug_mode: - _logger.info( - "Cloudsmith: list packages (single page), query=%r", - attempt, - ) + _logger.debug( + "Cloudsmith: list packages (single page), query=%r", attempt + ) response = requests.get( url, headers=self._get_headers(), @@ -203,33 +198,31 @@ def _paginate_packages(self, query: Optional[str]) -> list: ) if response.status_code != 200: last_error = response.text - if self.debug_mode: - _logger.warning( - "Cloudsmith: list failed for query=%r: %s", - attempt, - last_error, - ) + _logger.debug( + "Cloudsmith: list failed for query=%r: %s", + attempt, + last_error, + ) continue packages = response.json() if not isinstance(packages, list): - if self.debug_mode: - _logger.warning( - "Cloudsmith: unexpected payload for query=%r: %s", - attempt, - str(packages)[:200], - ) - continue - if self.debug_mode: - _logger.info( - "Cloudsmith: fetched %d packages for query=%r", - len(packages), + _logger.debug( + "Cloudsmith: unexpected payload for query=%r: %s", attempt, + str(packages)[:200], ) + continue + _logger.debug( + "Cloudsmith: fetched %d packages for query=%r", + len(packages), + attempt, + ) return packages - except Exception as e: + except (requests.RequestException, ValueError) as e: last_error = str(e) - if self.debug_mode: - _logger.exception("Cloudsmith: list error for query=%r", attempt) + _logger.debug( + "Cloudsmith: list error for query=%r", attempt, exc_info=True + ) raise RuntimeError( "Failed to list packages. Last error: %s" % (last_error or "unknown error") @@ -355,8 +348,7 @@ def _wait_for_package_sync( Returns: bool: True if package is synchronized, False if timeout """ - if self.debug_mode: - _logger.info(f"Cloudsmith: Waiting for package sync: {package_identifier}") + _logger.info(f"Cloudsmith: Waiting for package sync: {package_identifier}") start_time = time.time() while time.time() - start_time < max_wait_time: @@ -378,7 +370,7 @@ def _wait_for_package_sync( ) return True elif status_data.get("is_sync_failed", False): - _logger.warning( + _logger.debug( "Cloudsmith: Package sync failed: %s", package_identifier, ) @@ -387,10 +379,10 @@ def _wait_for_package_sync( time.sleep(2) # Wait 2 seconds before next check except Exception as e: - _logger.warning("Cloudsmith: Error checking sync status: %s", e) + _logger.debug("Cloudsmith: Error checking sync status: %s", e) time.sleep(2) - _logger.warning(f"Cloudsmith: Package sync timeout: " f"{package_identifier}") + _logger.debug(f"Cloudsmith: Package sync timeout: " f"{package_identifier}") return False def _upload_file_to_cloudsmith( @@ -486,8 +478,7 @@ def log_artifact(self, local_file: str, artifact_path: Optional[str] = None): local_file: Path to the local file to upload artifact_path: Optional path within the artifact store """ - if self.debug_mode: - _logger.info(f"Cloudsmith: log_artifact({local_file}, {artifact_path})") + _logger.debug("Cloudsmith: log_artifact(%s, %s)", local_file, artifact_path) verify_artifact_path(artifact_path) @@ -510,8 +501,7 @@ def log_artifacts(self, local_dir: str, artifact_path: Optional[str] = None): local_dir: Path to the local directory containing artifacts artifact_path: Optional path within the artifact store """ - if self.debug_mode: - _logger.info(f"Cloudsmith: log_artifacts({local_dir}, {artifact_path})") + _logger.debug("Cloudsmith: log_artifacts(%s, %s)", local_dir, artifact_path) verify_artifact_path(artifact_path) local_dir = os.path.abspath(local_dir) @@ -542,8 +532,7 @@ def list_artifacts(self, path: Optional[str] = None) -> List[FileInfo]: requested path, building results from a full preloaded tree of all artifacts for the run. """ - if self.debug_mode: - _logger.info(f"Cloudsmith: list_artifacts({path})") + _logger.debug("Cloudsmith: list_artifacts(%s)", path) # Normalize base and requested paths base_prefix = self.base_path.strip("/") if self.base_path else "" @@ -573,23 +562,20 @@ def _is_valid_run_id(x: Optional[str]) -> bool: q_parts.append(f"tag:experiment-{exp_id}") query = " and ".join(q_parts) - if self.debug_mode: - _logger.info( - ( - "Cloudsmith: base_prefix=%r, requested_path=%r, " - "exp_id=%r, run_id=%r, query=%r" - ), - base_prefix, - requested_path, - exp_id, - run_id, - query, - ) + _logger.debug( + ( + "Cloudsmith: base_prefix=%r, requested_path=%r, " + "exp_id=%r, run_id=%r, query=%r" + ), + base_prefix, + requested_path, + exp_id, + run_id, + query, + ) packages = self._paginate_packages(query) - - if self.debug_mode: - _logger.info("Cloudsmith: fetched %d packages", len(packages)) + _logger.debug("Cloudsmith: fetched %d packages", len(packages)) # Helper: choose package file size robustly def _pkg_size(pkg: dict) -> int: @@ -602,11 +588,11 @@ def _pkg_size(pkg: dict) -> int: return int(f.get("size") or 0) if files: return int(files[0].get("size") or 0) - except Exception: + except (KeyError, ValueError, TypeError): pass try: return int(size or 0) - except Exception: + except (TypeError, ValueError): return 0 # Helper: normalize an absolute artifact path to repo-root relative @@ -667,11 +653,10 @@ def _new_node(): node = node["dirs"][seg] else: # Path doesn't exist - if self.debug_mode: - _logger.info( - "Cloudsmith: requested path %r not found in tree", - requested_path, - ) + _logger.debug( + "Cloudsmith: requested path %r not found in tree", + requested_path, + ) return [] # Materialize immediate children as FileInfo @@ -694,12 +679,11 @@ def _new_node(): ) ) - if self.debug_mode: - _logger.info( - "Cloudsmith: list_artifacts -> %d items for path=%r", - len(results), - path, - ) + _logger.debug( + "Cloudsmith: list_artifacts -> %d items for path=%r", + len(results), + path, + ) return results def _extract_artifact_path_from_package(self, package: dict) -> Optional[str]: @@ -743,10 +727,9 @@ def _download_file(self, remote_file_path: str, local_path: str): """ Download a file from Cloudsmith to local storage. """ - if self.debug_mode: - _logger.info( - f"Cloudsmith: _download_file({remote_file_path}, {local_path})" - ) + _logger.debug( + "Cloudsmith: _download_file(%s, %s)", remote_file_path, local_path + ) # Fetch packages with strict per-run query when possible exp_guess, run_guess = self._infer_mlflow_context(self.base_path) @@ -779,14 +762,12 @@ def _is_valid_run_id(x: Optional[str]) -> bool: else: expected_full_path = remote_file_path.strip("/") - if self.debug_mode: - _logger.info( - f"Cloudsmith: Looking for file with path: {expected_full_path}" - ) - _logger.info( - f"Cloudsmith: Base path: {self.base_path}, " - f"Remote path: {remote_file_path}" - ) + _logger.debug("Cloudsmith: Looking for file with path: %s", expected_full_path) + _logger.debug( + "Cloudsmith: Base path: %s, Remote path: %s", + self.base_path, + remote_file_path, + ) for package in packages: description = package.get("description", "") or "" @@ -806,15 +787,14 @@ def _is_valid_run_id(x: Optional[str]) -> bool: .split("(", 1)[0] .strip() ) - except Exception: + except (IndexError, ValueError): desc_path = None - if self.debug_mode: - _logger.info( - "Cloudsmith: Checking package %s with desc_path=%s", - package.get("name", "unknown"), - desc_path, - ) + _logger.debug( + "Cloudsmith: Checking package %s with desc_path=%s", + package.get("name", "unknown"), + desc_path, + ) # Try multiple path matching strategies possible_paths = [expected_full_path] @@ -833,11 +813,10 @@ def _is_valid_run_id(x: Optional[str]) -> bool: for possible_path in possible_paths: if desc_path_clean == possible_path: target_package = package - if self.debug_mode: - _logger.info( - ("Cloudsmith: Found match via description " "path: %s"), - desc_path_clean, - ) + _logger.debug( + "Cloudsmith: Found match via description path: %s", + desc_path_clean, + ) break if target_package: break @@ -849,11 +828,10 @@ def _is_valid_run_id(x: Optional[str]) -> bool: for possible_path in possible_paths: if tag_path == possible_path: target_package = package - if self.debug_mode: - _logger.info( - ("Cloudsmith: Found match via tag " "path: %s"), - tag_path, - ) + _logger.debug( + "Cloudsmith: Found match via tag path: %s", + tag_path, + ) break if target_package: break @@ -864,53 +842,44 @@ def _is_valid_run_id(x: Optional[str]) -> bool: filename = package.get("filename", "") if filename == os.path.basename(remote_file_path): target_package = package - if self.debug_mode: - _logger.info( - "Cloudsmith: Found match via filename: %s", - filename, - ) + _logger.debug("Cloudsmith: Found match via filename: %s", filename) break if not target_package: - if self.debug_mode: - _logger.error( - "Cloudsmith: No package found for path: %s", - expected_full_path, + _logger.debug( + "Cloudsmith: No package found for path: %s", expected_full_path + ) + _logger.debug("Cloudsmith: Available packages:") + for pkg in packages[:5]: # Show first 5 packages for debugging + desc = pkg.get("description", "") + desc_path = None + if "MLflow artifact:" in desc: + try: + desc_path = ( + desc.split("MLflow artifact:", 1)[1] + .split("(", 1)[0] + .strip() + ) + except (IndexError, ValueError): + pass + _logger.debug( + " - %s: %s", + pkg.get("name", "unknown"), + desc_path, ) - _logger.error("Cloudsmith: Available packages:") - for pkg in packages[:5]: # Show first 5 packages for debugging - desc = pkg.get("description", "") - desc_path = None - if "MLflow artifact:" in desc: - try: - desc_path = ( - desc.split("MLflow artifact:", 1)[1] - .split("(", 1)[0] - .strip() - ) - except Exception: - pass - _logger.error( - " - %s: %s", - pkg.get("name", "unknown"), - desc_path, - ) raise FileNotFoundError(f"Artifact not found: {remote_file_path}") cdn_url = target_package.get("cdn_url") or target_package.get("download_url") if not cdn_url: raise RuntimeError(f"No download URL available for {remote_file_path}") - - if self.debug_mode: - _logger.info(f"Cloudsmith: Downloading from {cdn_url}") + _logger.info("Cloudsmith: Downloading from %s", cdn_url) # Use authentication for private repositories headers = {"Authorization": f"Bearer {self.api_key}"} response = requests.get(cdn_url, stream=True, headers=headers) - if self.debug_mode: - _logger.info(f"Cloudsmith: Response status: {response.status_code}") - if response.status_code != 200: - _logger.info(f"Cloudsmith: Response headers: {dict(response.headers)}") + _logger.debug("Cloudsmith: Response status: %s", response.status_code) + if response.status_code != 200: + _logger.debug("Cloudsmith: Response headers: %s", dict(response.headers)) if response.status_code != 200: error_msg = ( @@ -925,9 +894,7 @@ def _is_valid_run_id(x: Optional[str]) -> bool: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) - - if self.debug_mode: - _logger.info(f"Cloudsmith: Download complete: {local_path}") + _logger.debug("Cloudsmith: Download complete: %s", local_path) def _path_has_prefix(self, path: str, prefix: str) -> bool: """Return True if path equals prefix or starts with prefix + '/'.""" @@ -972,8 +939,7 @@ def download_artifacts( For single files, returns the file path. For directories, returns the directory path. """ - if self.debug_mode: - _logger.info(f"Cloudsmith: download_artifacts({artifact_path}, {dst_path})") + _logger.debug("Cloudsmith: download_artifacts(%s, %s)", artifact_path, dst_path) # Use artifact_path or default to empty string (root) artifact_path = artifact_path or "" From 164086f8de0e9bb7fa749ba0b84010c2bb7a9912 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Wed, 3 Sep 2025 16:46:04 +0100 Subject: [PATCH 2/5] use cli-code for upload raw multipart packages --- plugin/cloudsmith_repository.py | 173 +++++++++++++++------------- tests/test_cloudsmith_repository.py | 97 ++++++++++++++++ 2 files changed, 191 insertions(+), 79 deletions(-) diff --git a/plugin/cloudsmith_repository.py b/plugin/cloudsmith_repository.py index d99110a..6c72692 100644 --- a/plugin/cloudsmith_repository.py +++ b/plugin/cloudsmith_repository.py @@ -26,7 +26,7 @@ VERSION = "0.1.0" CLOUDSMITH_API_BASE = "https://api.cloudsmith.io/v1" - +CHUNK_SIZE = 100 * 1024 * 1024 # 100MB threshold class CloudsmithArtifactRepository(ArtifactRepository): """ @@ -386,89 +386,104 @@ def _wait_for_package_sync( return False def _upload_file_to_cloudsmith( - self, local_file: str, artifact_path: Optional[str] = None - ) -> str: - """ - Upload a file to Cloudsmith as a RAW package. - - Args: - local_file: Path to the local file to upload - artifact_path: MLflow artifact path - - Returns: - str: Package identifier - """ - if self.debug_mode: + self, local_file: str, artifact_path: Optional[str] = None + ) -> str: + """ + Upload a file to Cloudsmith as a RAW package. + + Args: + local_file: Path to the local file to upload + artifact_path: MLflow artifact path + + Returns: + str: Package identifier + """ _logger.info("Cloudsmith: Uploading %s as %s", local_file, artifact_path) - filename = os.path.basename(local_file) - md5_hash, sha256_hash = self._calculate_checksums(local_file) - - # Step 1: Request upload URL - upload_request = { - "filename": filename, - "md5_checksum": md5_hash, - "sha256_checksum": sha256_hash, - } - - files_url = f"{CLOUDSMITH_API_BASE}/files/{self.owner}/{self.repository}/" - response = requests.post( - files_url, headers=self._get_headers(), json=upload_request - ) - - if response.status_code != 202: - raise RuntimeError(f"Failed to request upload URL: {response.text}") - - upload_data = response.json() - upload_url = upload_data["upload_url"] - upload_fields = upload_data.get("upload_fields", {}) - file_identifier = upload_data["identifier"] - - if self.debug_mode: - _logger.info( - "Cloudsmith: Got upload URL and identifier: %s", - file_identifier, - ) - - # Step 2: Upload file to S3 - with open(local_file, "rb") as f: - files = {"file": (filename, f, "application/octet-stream")} - upload_response = requests.post(upload_url, data=upload_fields, files=files) - - if upload_response.status_code not in [200, 201, 204]: - raise RuntimeError(f"Failed to upload file: {upload_response.text}") - - # Step 3: Create RAW package - package_metadata = self._generate_package_metadata(artifact_path, local_file) - package_data = { - "package_file": file_identifier, - **package_metadata, - } - - raw_package_url = ( - f"{CLOUDSMITH_API_BASE}/packages/" - f"{self.owner}/{self.repository}/upload/raw/" - ) - package_response = requests.post( - raw_package_url, headers=self._get_headers(), json=package_data - ) - - if package_response.status_code != 201: - raise RuntimeError(f"Failed to create RAW package: {package_response.text}") - - package_info = package_response.json() - package_identifier = package_info["identifier_perm"] + filename = os.path.basename(local_file) + file_size = os.path.getsize(local_file) + md5_hash, sha256_hash = self._calculate_checksums(local_file) + is_multi = file_size > CHUNK_SIZE + + # 1. Request upload + upload_req = { + "filename": filename, + "md5_checksum": md5_hash, + "sha256_checksum": sha256_hash, + } + if is_multi: + upload_req["method"] = "put_parts" + + files_url = f"{CLOUDSMITH_API_BASE}/files/{self.owner}/{self.repository}/" + resp = requests.post(files_url, headers=self._get_headers(), json=upload_req) + if resp.status_code != 202: + raise RuntimeError(f"Failed to request upload URL: {resp.text}") + data = resp.json() + upload_url = data["upload_url"] + upload_fields = data.get("upload_fields", {}) + file_id = data["identifier"] + _logger.debug("Cloudsmith: Upload prepared id=%s multi=%s", file_id, is_multi) + + # 2. Upload file (single vs multi) + if is_multi: + part = 1 + with open(local_file, "rb") as fh: + while True: + chunk = fh.read(CHUNK_SIZE) + if not chunk: + break + r = requests.put( + upload_url, + headers={"X-Api-Key": self.api_key}, + params={"upload_id": file_id, "part_number": part}, + data=chunk, + ) + if r.status_code not in (200, 201, 204): + raise RuntimeError( + f"Failed uploading part {part}: {r.status_code} {r.text[:160]}" + ) + part += 1 + complete_url = ( + f"{CLOUDSMITH_API_BASE}/files/{self.owner}/{self.repository}/{file_id}/complete/" + ) + comp_payload = {"upload_id": file_id, "complete": True} + c_resp = requests.post( + complete_url, headers=self._get_headers(), json=comp_payload + ) + if c_resp.status_code not in (200, 201, 202): + raise RuntimeError( + f"Failed to complete multi-part upload: {c_resp.status_code} {c_resp.text[:160]}" + ) + _logger.debug("Cloudsmith: multi-part upload complete id=%s", file_id) + else: + with open(local_file, "rb") as fh: + files = {"file": (filename, fh, "application/octet-stream")} + u_resp = requests.post( + upload_url, data=upload_fields, files=files + ) + if u_resp.status_code not in (200, 201, 204): + raise RuntimeError( + f"Failed to upload file: {u_resp.status_code} {u_resp.text[:160]}" + ) - if self.debug_mode: - _logger.info( - "Cloudsmith: Created package with identifier: %s", - package_identifier, + # 3. Create package + meta = self._generate_package_metadata(artifact_path, local_file) + pkg_payload = {"package_file": file_id, **meta} + raw_url = ( + f"{CLOUDSMITH_API_BASE}/packages/{self.owner}/{self.repository}/upload/raw/" ) + p_resp = requests.post(raw_url, headers=self._get_headers(), json=pkg_payload) + if p_resp.status_code != 201: + raise RuntimeError( + f"Failed to create RAW package: {p_resp.status_code} {p_resp.text[:200]}" + ) + pkg = p_resp.json() + identifier = pkg["identifier_perm"] + _logger.debug("Cloudsmith: Created package %s", identifier) - # Wait for package to be synchronized if needed - self._wait_for_package_sync(package_identifier) - - return package_identifier + # 4. Wait for sync + self._wait_for_package_sync(identifier) + return identifier def log_artifact(self, local_file: str, artifact_path: Optional[str] = None): """ diff --git a/tests/test_cloudsmith_repository.py b/tests/test_cloudsmith_repository.py index fe325d7..3cb5718 100644 --- a/tests/test_cloudsmith_repository.py +++ b/tests/test_cloudsmith_repository.py @@ -184,6 +184,76 @@ def test_upload_file_success(self, mock_get, mock_post): finally: os.unlink(temp_path) + @patch("requests.post") + @patch("requests.put") + @patch("requests.get") + def test_upload_file_multipart_success( + self, mock_get, mock_put, mock_post + ): + """Test successful multi-part upload (minimal path).""" + # Force small CHUNK_SIZE so we don't create a huge file + from plugin import cloudsmith_repository as csr + original_chunk = csr.CHUNK_SIZE + csr.CHUNK_SIZE = 10 # bytes + try: + # Prepare file larger than 2 chunks + import tempfile + with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as f: + f.write(b"0123456789ABCDEFGHIJ") # 20 bytes -> 2 chunks of 10 + temp_path = f.name + + # Mock sequence: + # 1) POST upload URL (202) + # 2) POST completion (200) + # 3) POST package create (201) + mock_upload_resp = Mock() + mock_upload_resp.status_code = 202 + mock_upload_resp.json.return_value = { + "upload_url": "https://upload.example.com/file", + "upload_fields": {}, + "identifier": "multi-id-123", + } + mock_complete_resp = Mock() + mock_complete_resp.status_code = 200 + mock_complete_resp.json.return_value = {} + mock_pkg_resp = Mock() + mock_pkg_resp.status_code = 201 + mock_pkg_resp.json.return_value = { + "identifier_perm": "pkg-multi-identifier" + } + mock_post.side_effect = [ + mock_upload_resp, + mock_complete_resp, + mock_pkg_resp, + ] + + # PUT per chunk + mock_put_resp = Mock() + mock_put_resp.status_code = 200 + mock_put.return_value = mock_put_resp + + # Sync polling GET + mock_sync_resp = Mock() + mock_sync_resp.status_code = 200 + mock_sync_resp.json.return_value = { + "is_sync_completed": True, + "is_sync_failed": False, + } + mock_get.return_value = mock_sync_resp + + result = self.repo._upload_file_to_cloudsmith( + temp_path, "test/mp.bin" + ) + self.assertEqual(result, "pkg-multi-identifier") + # Ensure multi-part path used (>=2 PUT calls) + self.assertGreaterEqual(mock_put.call_count, 2) + # Ensure completion + package creation posts present + self.assertEqual(mock_post.call_count, 3) + finally: + csr.CHUNK_SIZE = original_chunk + if 'temp_path' in locals() and os.path.exists(temp_path): + os.unlink(temp_path) + @patch( "plugin.cloudsmith_repository.CloudsmithArtifactRepository." "_upload_file_to_cloudsmith" @@ -368,6 +438,33 @@ def test_integration_upload_and_list(self): finally: os.unlink(temp_path) + def test_integration_large_multipart_upload(self): # pragma: no cover + """Test a large (multi-part) upload path if integration enabled. + + Creates a file just over CHUNK_SIZE to force multi-part logic and + verifies it appears in listing under the integration directory. + """ + from plugin import cloudsmith_repository as csr + import tempfile + large_path = None + try: + with tempfile.NamedTemporaryFile(delete=False) as f: + # Create sparse file of size CHUNK_SIZE + 1 byte + f.seek(csr.CHUNK_SIZE) + f.write(b"X") + large_path = f.name + + # Upload (will trigger multi-part due to size) + self.repo.log_artifact(large_path, "integration/large") + + # List artifacts under integration to ensure presence + artifacts = self.repo.list_artifacts("integration") + matched = [a for a in artifacts if "large" in a.path] + self.assertTrue(matched, "Large multi-part artifact not found in listing") + finally: + if large_path and os.path.exists(large_path): + os.unlink(large_path) + if __name__ == "__main__": # Set up logging for tests From df54fd0446bc85c00e9ac61eb5b576fc8b37d1f7 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Wed, 3 Sep 2025 16:48:47 +0100 Subject: [PATCH 3/5] use cli-code for for wait_for_sync func --- plugin/cloudsmith_repository.py | 114 ++++++++++++++++++++++++++------ 1 file changed, 92 insertions(+), 22 deletions(-) diff --git a/plugin/cloudsmith_repository.py b/plugin/cloudsmith_repository.py index 6c72692..6144b72 100644 --- a/plugin/cloudsmith_repository.py +++ b/plugin/cloudsmith_repository.py @@ -348,42 +348,112 @@ def _wait_for_package_sync( Returns: bool: True if package is synchronized, False if timeout """ - _logger.info(f"Cloudsmith: Waiting for package sync: {package_identifier}") + _logger.debug("Cloudsmith: Waiting for package sync: %s", package_identifier) + + wait_interval = 2.0 + total_wait_interval = max(1.0, wait_interval) + last_progress = 0 + first = True + start = time.time() + + def _progress(payload: dict) -> int: + for key in ("sync_progress", "progress", "percentage"): + val = payload.get(key) + if isinstance(val, (int, float)): + try: + iv = int(val) + if 0 <= iv <= 100: + return max(1, iv) + except (TypeError, ValueError): + pass + return 0 + + while True: + elapsed = time.time() - start + if elapsed >= max_wait_time: + _logger.warning( + "Cloudsmith: Package sync timeout after %.1fs: %s", + elapsed, + package_identifier, + ) + return False - start_time = time.time() - while time.time() - start_time < max_wait_time: try: status_url = ( f"{CLOUDSMITH_API_BASE}/packages/" f"{self.owner}/{self.repository}/" f"{package_identifier}/status/" ) - response = requests.get(status_url, headers=self._get_headers()) - - if response.status_code == 200: - status_data = response.json() - if status_data.get("is_sync_completed", False): - if self.debug_mode: - _logger.info( - "Cloudsmith: Package synchronized: %s", + resp = requests.get(status_url, headers=self._get_headers()) + if resp.status_code == 404: + _logger.debug( + "Cloudsmith: status 404 (not ready) for %s", + package_identifier, + ) + elif resp.status_code != 200: + _logger.debug( + "Cloudsmith: status %s polling %s", + resp.status_code, + package_identifier, + ) + else: + data = resp.json() + ok = bool(data.get("is_sync_completed")) + failed = bool(data.get("is_sync_failed")) + prog = _progress(data) + if prog > last_progress: + delta = prog - last_progress + last_progress = prog + _logger.debug( + "Cloudsmith: sync progress +%d -> %d%% (%s)", + delta, + prog, + package_identifier, + ) + if ok: + if last_progress < 100: + _logger.debug( + "Cloudsmith: forcing progress to 100%% (%s)", package_identifier, ) - return True - elif status_data.get("is_sync_failed", False): _logger.debug( - "Cloudsmith: Package sync failed: %s", + "Cloudsmith: Package synchronized in %.2fs: %s", + elapsed, package_identifier, ) + return True + if failed: + reason = data.get("sync_failure_reason") or data.get("reason") + if reason: + _logger.warning( + "Cloudsmith: Package sync failed (%s): %s", + package_identifier, + reason, + ) + else: + _logger.warning( + "Cloudsmith: Package sync failed (no reason): %s", + package_identifier, + ) return False + except requests.RequestException as e: + _logger.debug( + "Cloudsmith: transient HTTP error waiting for %s: %s", + package_identifier, + e, + ) + except ValueError as e: + _logger.debug( + "Cloudsmith: JSON parse error waiting for %s: %s", + package_identifier, + e, + ) - time.sleep(2) # Wait 2 seconds before next check - - except Exception as e: - _logger.debug("Cloudsmith: Error checking sync status: %s", e) - time.sleep(2) - - _logger.debug(f"Cloudsmith: Package sync timeout: " f"{package_identifier}") - return False + if first: + first = False + else: + time.sleep(total_wait_interval) + total_wait_interval = min(300.0, total_wait_interval + wait_interval) def _upload_file_to_cloudsmith( self, local_file: str, artifact_path: Optional[str] = None From c44c5c6e74e9413c3bd2a9b0e368b8b122bd32dc Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Mon, 13 Oct 2025 13:33:12 +0100 Subject: [PATCH 4/5] EG code review --- plugin/cloudsmith_repository.py | 48 +++++++++++++++------------------ 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/plugin/cloudsmith_repository.py b/plugin/cloudsmith_repository.py index 6144b72..121cfa8 100644 --- a/plugin/cloudsmith_repository.py +++ b/plugin/cloudsmith_repository.py @@ -385,17 +385,18 @@ def _progress(payload: dict) -> int: f"{package_identifier}/status/" ) resp = requests.get(status_url, headers=self._get_headers()) - if resp.status_code == 404: - _logger.debug( - "Cloudsmith: status 404 (not ready) for %s", - package_identifier, - ) - elif resp.status_code != 200: - _logger.debug( - "Cloudsmith: status %s polling %s", - resp.status_code, - package_identifier, - ) + if resp.status_code != 200: + if resp.status_code == 404: + _logger.debug( + "Cloudsmith: status 404 (not ready) for %s", + package_identifier, + ) + else: + _logger.debug( + "Cloudsmith: status %s polling %s", + resp.status_code, + package_identifier, + ) else: data = resp.json() ok = bool(data.get("is_sync_completed")) @@ -410,11 +411,10 @@ def _progress(payload: dict) -> int: prog, package_identifier, ) - if ok: - if last_progress < 100: - _logger.debug( - "Cloudsmith: forcing progress to 100%% (%s)", - package_identifier, + if ok and last_progress < 100: + _logger.debug( + "Cloudsmith: forcing progress to 100% (%s)", + package_identifier, ) _logger.debug( "Cloudsmith: Package synchronized in %.2fs: %s", @@ -424,17 +424,11 @@ def _progress(payload: dict) -> int: return True if failed: reason = data.get("sync_failure_reason") or data.get("reason") - if reason: - _logger.warning( - "Cloudsmith: Package sync failed (%s): %s", - package_identifier, - reason, - ) - else: - _logger.warning( - "Cloudsmith: Package sync failed (no reason): %s", - package_identifier, - ) + _logger.warning( + "Cloudsmith: Package sync failed (%s): %s", + package_identifier, + reason or "no reason", + ) return False except requests.RequestException as e: _logger.debug( From 2ecc22d43a93692677a5eec810fc3ac4a05d9692 Mon Sep 17 00:00:00 2001 From: Bartosz Blizniak Date: Mon, 13 Oct 2025 13:57:08 +0100 Subject: [PATCH 5/5] copilot code review commens --- plugin/cloudsmith_repository.py | 4 ++-- tests/test_cloudsmith_repository.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin/cloudsmith_repository.py b/plugin/cloudsmith_repository.py index 121cfa8..7577dca 100644 --- a/plugin/cloudsmith_repository.py +++ b/plugin/cloudsmith_repository.py @@ -1,3 +1,4 @@ +# Copyright 2025 Cloudsmith Ltd """ Cloudsmith MLflow plugin - Main repository implementation. @@ -957,8 +958,7 @@ def _is_valid_run_id(x: Optional[str]) -> bool: headers = {"Authorization": f"Bearer {self.api_key}"} response = requests.get(cdn_url, stream=True, headers=headers) _logger.debug("Cloudsmith: Response status: %s", response.status_code) - if response.status_code != 200: - _logger.debug("Cloudsmith: Response headers: %s", dict(response.headers)) + _logger.debug("Cloudsmith: Response headers: %s", dict(response.headers)) if response.status_code != 200: error_msg = ( diff --git a/tests/test_cloudsmith_repository.py b/tests/test_cloudsmith_repository.py index 3cb5718..2a59f72 100644 --- a/tests/test_cloudsmith_repository.py +++ b/tests/test_cloudsmith_repository.py @@ -251,7 +251,7 @@ def test_upload_file_multipart_success( self.assertEqual(mock_post.call_count, 3) finally: csr.CHUNK_SIZE = original_chunk - if 'temp_path' in locals() and os.path.exists(temp_path): + if temp_path and os.path.exists(temp_path): os.unlink(temp_path) @patch(