From 0553e8019e575863ee7e15dc4dd87624ba354537 Mon Sep 17 00:00:00 2001 From: colinmoynes Date: Thu, 19 Mar 2026 17:04:11 +0000 Subject: [PATCH 1/2] vulnerabilities flag added, and results now reflected at parent tag correctly for multi-arch --- Docker/Sonar/sonar.py | 431 +++++++++++++++++++++++++++++++----------- 1 file changed, 316 insertions(+), 115 deletions(-) diff --git a/Docker/Sonar/sonar.py b/Docker/Sonar/sonar.py index 55ca081..ede0970 100755 --- a/Docker/Sonar/sonar.py +++ b/Docker/Sonar/sonar.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +# Copyright 2026 Cloudsmith Ltd import sys import os @@ -174,6 +175,22 @@ def format_status(status_str): if status_str == "Failed": return f"[bold red]{status_str}[/bold red]" return status_str +def format_vuln_status(vuln_status): + """Returns a rich-formatted vulnerability scan status string.""" + if not vuln_status: + return "[dim]N/A[/dim]" + if vuln_status == "Scanned": + return f"[green]{vuln_status}[/green]" + if vuln_status == "Not Scanned": + return f"[dim]{vuln_status}[/dim]" + if vuln_status == "Not Supported": + return f"[dim yellow]{vuln_status}[/dim yellow]" + if vuln_status == "Scanning": + return f"[yellow]{vuln_status}[/yellow]" + if vuln_status == "Failed": + return f"[bold red]{vuln_status}[/bold red]" + return vuln_status + def parse_filter_criteria(filter_str): """Parses and strips 'downloads' criteria from filter string.""" if not filter_str: return "", [] @@ -199,6 +216,149 @@ def matches_criteria(value, criteria): if op == '<=' and not (value <= target): return False return True +def empty_vuln_summary(): + """Returns an empty vulnerability summary dict.""" + return { + "vuln_status": None, + "critical": 0, + "high": 0, + "medium": 0, + "low": 0, + "unknown": 0, + "total": 0, + } + +def fetch_vulnerability_data(workspace, repo, slug_perm): + """Fetches vulnerability scan data for a package slug_perm. + + 1. Lists scans via GET /v1/vulnerabilities/{owner}/{repo}/{package}/ + 2. Fetches the latest scan details via GET /v1/vulnerabilities/{owner}/{repo}/{package}/{scan_id}/ + + Returns a vulnerability summary dict. + """ + summary = empty_vuln_summary() + + logger.debug(f"Vuln slug perm: {slug_perm}") + + if not slug_perm: + summary["vuln_status"] = "Not Supported" + return summary + + # Step 1: List scans for the package + list_url = f"https://api.cloudsmith.io/v1/vulnerabilities/{workspace}/{repo}/{slug_perm}/" + scans = make_request(list_url) + + if scans is None: + summary["vuln_status"] = "Not Supported" + return summary + + # The response may be a list or a paginated dict with 'results' + scan_list = [] + if isinstance(scans, list): + scan_list = scans + elif isinstance(scans, dict) and "results" in scans: + scan_list = scans["results"] + elif isinstance(scans, dict) and "identifier" in scans: + # Single scan returned directly + scan_list = [scans] + + if not scan_list: + summary["vuln_status"] = "Not Scanned" + return summary + + # Pick the most recent scan (first in list, typically sorted by recency) + latest_scan = scan_list[0] + scan_id = latest_scan.get("identifier") or latest_scan.get("id") + + if not scan_id: + summary["vuln_status"] = "Not Scanned" + return summary + + # Check if scan is still in progress + scan_status = latest_scan.get("status") or latest_scan.get("state") + if scan_status and scan_status.lower() in ("pending", "running", "in_progress"): + summary["vuln_status"] = "Scanning" + return summary + + # Step 2: Fetch scan details + detail_url = f"https://api.cloudsmith.io/v1/vulnerabilities/{workspace}/{repo}/{slug_perm}/{scan_id}/" + scan_detail = make_request(detail_url) + logger.debug(f"Scan details: {scan_detail}") + + if not scan_detail: + # Fallback: use top-level fields from list response if available + summary["vuln_status"] = "Scanned" + if latest_scan.get("num_vulnerabilities"): + _extract_severity_from_scans(latest_scan.get("scans", []), summary) + return summary + + summary["vuln_status"] = "Scanned" + + # The actual structure has vulnerabilities nested under scans[].results[] + # Each result has a "severity" field (Critical, High, Medium, Low, etc.) + scan_entries = scan_detail.get("scans", []) + if scan_entries: + _extract_severity_from_scans(scan_entries, summary) + else: + # Fallback: try top-level num_* fields or a vulnerabilities list + for sev in ("critical", "high", "medium", "low", "unknown"): + count = ( + scan_detail.get(f"num_{sev}") + or scan_detail.get(f"{sev}_count") + or scan_detail.get(sev) + ) + if count is not None: + summary[sev] = int(count) + + summary["total"] = sum(summary[s] for s in ("critical", "high", "medium", "low", "unknown")) + + logger.debug(f"Vulnerability data for {slug_perm}: {summary}") + return summary + + +def _extract_severity_from_scans(scan_entries, summary): + """Aggregates severity counts from the scans[].results[] structure.""" + for scan_entry in scan_entries: + results = scan_entry.get("results", []) + for vuln in results: + sev = (vuln.get("severity") or "unknown").lower() + if sev in summary: + summary[sev] += 1 + else: + summary["unknown"] += 1 + +def rollup_vuln_summaries(children_vulns): + """Rolls up vulnerability summaries from child images into a parent summary. + + Uses the max count per severity across children rather than summing, + since child images in a multi-arch manifest typically share the same + CVEs from common base packages. + """ + if not children_vulns: + return empty_vuln_summary() + + rollup = empty_vuln_summary() + + # Determine overall scan status + statuses = set(v.get("vuln_status") for v in children_vulns if v.get("vuln_status")) + + if "Scanned" in statuses: + rollup["vuln_status"] = "Scanned" + elif "Scanning" in statuses: + rollup["vuln_status"] = "Scanning" + elif "Not Scanned" in statuses: + rollup["vuln_status"] = "Not Scanned" + else: + rollup["vuln_status"] = "Not Supported" + + # Use max across children to avoid double-counting shared CVEs + for sev in ("critical", "high", "medium", "low", "unknown"): + rollup[sev] = max((v.get(sev, 0) for v in children_vulns), default=0) + + rollup["total"] = sum(rollup[s] for s in ("critical", "high", "medium", "low", "unknown")) + + return rollup + def batch_delete_packages(workspace, repo, slugs): """Deletes a list of package slugs in batches to respect rate limits.""" deleted = set() @@ -245,7 +405,7 @@ def get_manifest_children(workspace, repo, img, digest): # --- Core Logic --- -def get_digest_data(workspace, repo, img, digest, ntag_display, platform="unknown"): +def get_digest_data(workspace, repo, img, digest, ntag_display, platform="unknown", fetch_vulns=False): """Fetches data for a specific digest (child image) and returns data dict.""" # 1. Fetch Manifest to get Architecture (Only if unknown) @@ -273,10 +433,14 @@ def get_digest_data(workspace, repo, img, digest, ntag_display, platform="unknow version = digest.replace("sha256:", "") api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?query=version:{version}" pkg_details = make_request(api_url, {"Cache-Control": "no-cache"}) + + logger.debug(f"{pkg_details}") status_raw = "Unknown" dl = 0 uploaded_at = "" + slug = "" + slug_perm = "" if pkg_details: statuses = set(find_key_recursive(pkg_details, 'status_str')) @@ -293,7 +457,22 @@ def get_digest_data(workspace, repo, img, digest, ntag_display, platform="unknow if uploaded_at: uploaded = uploaded_at - return { + slugs = find_key_recursive(pkg_details, 'slug') + if slugs: + slug = slugs[0] + + # Only grab slug_perm from the top-level package object, not nested sub-objects + if isinstance(pkg_details, list) and len(pkg_details) > 0: + slug_perm = pkg_details[0].get('slug_perm', '') + elif isinstance(pkg_details, dict): + slug_perm = pkg_details.get('slug_perm', '') + + # 3. Fetch vulnerability data if requested + vuln_summary = empty_vuln_summary() + if fetch_vulns and slug_perm: + vuln_summary = fetch_vulnerability_data(workspace, repo, slug_perm) + + result = { "tag": ntag_display, "type": "image", "platform": platform, @@ -301,10 +480,15 @@ def get_digest_data(workspace, repo, img, digest, ntag_display, platform="unknow "downloads": dl, "uploaded": uploaded, "digest": digest, - "is_child": True + "is_child": True, + "slug": slug, + "slug_perm": slug_perm, } + result.update(vuln_summary) + + return result -def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False, filtering_digests=None): +def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False, filtering_digests=None, fetch_vulns=False): """Fetches the manifest list for a tag and returns a list of data dicts.""" manifest_url = f"{CLOUDSMITH_URL}/v2/{workspace}/{repo}/{img}/manifests/{ntag}" @@ -316,10 +500,6 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False # Parse out digests and platforms is_list = 'manifests' in manifest_json - # Removed to allow single images to be processed by default - # if not is_list and not include_all: - # return [] - children = [] if is_list: if 'manifests' in manifest_json: @@ -330,7 +510,6 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False arch = p.get('architecture', 'unknown') plat = f"{os_name}/{arch}" - # Removed strict arch check to prevent filtering valid images with missing metadata if d: children.append({'digest': d, 'platform': plat}) else: @@ -349,6 +528,7 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False parent_status = "Unknown" index_digest = "" slug = "" + slug_perm = "" parent_platform = "multi" if is_list else "unknown" total_downloads = 0 @@ -357,6 +537,7 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False parent_status = pkg.get('status_str', 'Unknown') uploaded_at = pkg.get('uploaded_at') slug = pkg.get('slug', '') + slug_perm = pkg.get('slug_perm', '') ver = pkg.get('version', '') if ver and not ver.startswith('sha256:'): index_digest = f"sha256:{ver}" @@ -372,15 +553,12 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False # Apply Filtering if active parent_matched = True if filtering_digests is not None: - # Check if parent digest is in filter - # Handle potential prefix differences (sha256: vs raw) raw_index = index_digest.replace("sha256:", "") norm_index = index_digest if index_digest.startswith("sha256:") else f"sha256:{index_digest}" parent_matched = (index_digest in filtering_digests) or (raw_index in filtering_digests) or (norm_index in filtering_digests) if not parent_matched: - # Parent didn't match, so filter children to find matches filtered_children = [] for c in children: d = c['digest'] @@ -390,21 +568,22 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False filtered_children.append(c) if not filtered_children: - # No match in parent or ANY children -> exclude this tag return [] - # Update children list to only matched children children = filtered_children # Process children children_data = [] derived_status = None + children_vulns = [] # collect vulnerability summaries for rollup if is_list: for child in children: - data = get_digest_data(workspace, repo, img, child['digest'], ntag, platform=child['platform']) + data = get_digest_data(workspace, repo, img, child['digest'], ntag, platform=child['platform'], fetch_vulns=fetch_vulns) children_data.append(data) total_downloads += data['downloads'] + if fetch_vulns: + children_vulns.append(data) # Check quarantine status of children if children_data: @@ -416,22 +595,30 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False elif quarantined_count > 0: derived_status = "Partial Quarantine" - # Override parent status if derived from children if derived_status: parent_status = derived_status + # Vulnerability rollup for parent + parent_vuln = empty_vuln_summary() + if fetch_vulns: + if is_list and children_vulns: + # Roll up children vulnerabilities into parent + parent_vuln = rollup_vuln_summaries(children_vulns) + elif not is_list and slug_perm: + # Single image: fetch directly + parent_vuln = fetch_vulnerability_data(workspace, repo, slug_perm) + elif is_list and not children_vulns: + parent_vuln["vuln_status"] = "Not Supported" + # Fallback: Fetch config blob for single images to determine platform if not is_list and (parent_platform == "unknown" or not parent_platform): - # 1. Check for Schema 1 top-level architecture if 'architecture' in manifest_json: parent_platform = f"{manifest_json.get('os', 'linux')}/{manifest_json.get('architecture')}" else: - # 2. Fetch Config Blob (Schema 2) cfg = manifest_json.get('config', {}) cfg_digest = cfg.get('digest') if cfg_digest: blob_url = f"{CLOUDSMITH_URL}/v2/{workspace}/{repo}/{img}/blobs/{cfg_digest}" - # Config blob is JSON blob_data = make_request(blob_url) if blob_data: b_os = blob_data.get('os', 'linux') @@ -441,7 +628,7 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False results = [] # Parent Data - results.append({ + parent_row = { "tag": ntag, "type": "manifest/list" if is_list else "image", "platform": parent_platform, @@ -450,11 +637,13 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False "downloads": total_downloads, "digest": index_digest, "is_child": False, - "slug": slug - }) + "slug": slug, + "slug_perm": slug_perm, + } + parent_row.update(parent_vuln) + results.append(parent_row) # Children Data - # Force show children if we are in filtering mode and matched children (context is important) show_children = detailed if filtering_digests is not None and not parent_matched and children_data: show_children = True @@ -464,7 +653,7 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False return results -def fetch_untagged_data(pkg, workspace, repo, img, detailed=False): +def fetch_untagged_data(pkg, workspace, repo, img, detailed=False, fetch_vulns=False): digest = pkg.get('version') if digest and not digest.startswith('sha256:'): digest = f"sha256:{digest}" @@ -472,6 +661,7 @@ def fetch_untagged_data(pkg, workspace, repo, img, detailed=False): status = pkg.get('status_str') downloads = pkg.get('downloads', 0) slug = pkg.get('slug') + slug_perm = pkg.get('slug_perm', '') pkg_type = pkg.get('type_display', 'image') platform_str = "unknown" @@ -502,7 +692,6 @@ def fetch_untagged_data(pkg, workspace, repo, img, detailed=False): child_digests = [] if manifest_json: - # If we have a manifest, we prefer its data over the API summary manifest_platform = "unknown" if 'manifests' in manifest_json: @@ -519,18 +708,14 @@ def fetch_untagged_data(pkg, workspace, repo, img, detailed=False): child_digests.append({'digest': m['digest'], 'platform': plat}) manifest_platform = " ".join(sorted(list(archs))) else: - # Single image (orphan) - # Check Schema 1/2 top level if 'architecture' in manifest_json: manifest_platform = f"{manifest_json.get('os', 'linux')}/{manifest_json.get('architecture')}" if manifest_platform == "unknown" or manifest_platform.endswith("/unknown"): found_archs = find_key_recursive(manifest_json, 'architecture') if found_archs: - # This might find 'amd64', need to guess OS manifest_platform = f"linux/{found_archs[0]}" - # Check Config Blob (Schema 2) if manifest_platform == "unknown" or manifest_platform.endswith("/unknown"): cfg = manifest_json.get('config', {}) cfg_digest = cfg.get('digest') @@ -555,6 +740,10 @@ def fetch_untagged_data(pkg, workspace, repo, img, detailed=False): if d_uploaded: uploaded_at = d_uploaded + # Pick up slug_perm from details if not already set + if not slug_perm: + slug_perm = details.get('slug_perm', '') + d_arch = details.get('architecture') if d_arch and d_arch != 'unknown': platform_str = d_arch @@ -574,8 +763,29 @@ def fetch_untagged_data(pkg, workspace, repo, img, detailed=False): tag_display = "(untagged)" if pkg_type == "manifest/list" else "(orphan)" + # Vulnerability data + parent_vuln = empty_vuln_summary() + children_vulns = [] + results = [] - results.append({ + + if detailed and child_digests: + for child in child_digests: + row = get_digest_data(workspace, repo, img, child['digest'], tag_display, platform=child['platform'], fetch_vulns=fetch_vulns) + results.append(row) + if fetch_vulns: + children_vulns.append(row) + + # Compute parent vulnerability rollup + if fetch_vulns: + if child_digests and children_vulns: + parent_vuln = rollup_vuln_summaries(children_vulns) + elif slug_perm: + parent_vuln = fetch_vulnerability_data(workspace, repo, slug_perm) + else: + parent_vuln["vuln_status"] = "Not Supported" + + parent_row = { "tag": tag_display, "type": pkg_type, "platform": platform_str, @@ -584,19 +794,21 @@ def fetch_untagged_data(pkg, workspace, repo, img, detailed=False): "downloads": downloads, "digest": digest, "is_child": False, - "slug": slug # Internal use - }) + "slug": slug, + "slug_perm": slug_perm, + } + parent_row.update(parent_vuln) + + # Insert parent at the beginning + final_results = [parent_row] + final_results.extend(results) if detailed and child_digests: - for child in child_digests: - # FIX: get_digest_data returns a dict, not a tuple - row = get_digest_data(workspace, repo, img, child['digest'], tag_display, platform=child['platform']) - results.append(row) - results.append("SECTION") + final_results.append("SECTION") - return results, slug + return final_results, slug -def get_untagged_images(workspace, repo, img, delete=False, detailed=False, progress=None): +def get_untagged_images(workspace, repo, img, delete=False, detailed=False, progress=None, fetch_vulns=False): # Helper to handle pagination def fetch_all_pages(start_url): results = [] @@ -610,21 +822,17 @@ def fetch_all_pages(start_url): if isinstance(data, list): results.extend(data) - # Check Link header for next page - # Format: ; rel="next", ; rel="last" next_url = None link_header = headers.get('Link') if link_header: for link in link_header.split(','): parts = link.split(';') if len(parts) >= 2 and 'rel="next"' in parts[1]: - # Extract URL from next_url = parts[0].strip().strip('<>') break return results api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/" - # Added page_size=100 for efficiency query = urlencode({'query': f"name:{img}", 'page_size': 100}) full_url = f"{api_url}?{query}" @@ -643,7 +851,6 @@ def fetch_all_pages(start_url): if p_type == 'manifest/list': all_manifest_lists.append(p) elif not tags: - # It's an image (not a list) and has no tags potential_orphans.append(p) # 1. Identify Untagged Manifest Lists @@ -652,15 +859,12 @@ def fetch_all_pages(start_url): # 2. Identify Orphaned Images (Untagged & Not Referenced) orphaned_pkgs = [] if potential_orphans: - # We need to build a set of all referenced digests from ALL manifest lists referenced_digests = set() if progress: progress.console.print(f"[dim]Checking {len(all_manifest_lists)} manifest lists for references...[/dim]") - # We can fetch these in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - # We need the digest for the manifest list itself to fetch its manifest futures = [] for ml in all_manifest_lists: digest = ml.get('version') @@ -675,7 +879,6 @@ def fetch_all_pages(start_url): except Exception: pass - # Filter potential orphans for p in potential_orphans: digest = p.get('version') if digest and not digest.startswith('sha256:'): @@ -691,7 +894,6 @@ def fetch_all_pages(start_url): logger.info(f"Found {len(untagged_pkgs)} untagged lists and {len(orphaned_pkgs)} orphaned images for: {img}") - # Fetch data first results_map = {} packages_to_delete = [] @@ -700,7 +902,7 @@ def fetch_all_pages(start_url): task_id = progress.add_task(f"[cyan]Analyzing {img}[/cyan] ({len(targets)} items)", total=len(targets)) with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - futures = {executor.submit(fetch_untagged_data, pkg, workspace, repo, img, detailed): i for i, pkg in enumerate(targets)} + futures = {executor.submit(fetch_untagged_data, pkg, workspace, repo, img, detailed, fetch_vulns=fetch_vulns): i for i, pkg in enumerate(targets)} for future in concurrent.futures.as_completed(futures): index = futures[future] try: @@ -716,19 +918,16 @@ def fetch_all_pages(start_url): if progress and task_id is not None: progress.remove_task(task_id) - # Perform Deletion if requested deleted_slugs = set() failed_slugs = set() if delete and packages_to_delete: deleted_slugs, failed_slugs = batch_delete_packages(workspace, repo, packages_to_delete) - # Build Result Groups groups = [] for i in range(len(targets)): if i in results_map: rows, slug = results_map[i] - # Update action status action_str = "" if delete: if slug in deleted_slugs: @@ -739,17 +938,13 @@ def fetch_all_pages(start_url): for row in rows: if isinstance(row, dict): row['action'] = action_str - # Remove internal slug if 'slug' in row: del row['slug'] groups.append(rows) return groups -# filepath: /Users/cmoynes/dev/support-engineering/Docker/Sonar/sonar.py -# --- Core Logic --- - -def get_image_analysis(workspace, repo, img_name, delete_all=False, delete_tag=None, detailed=False, progress=None, include_all=False, query_filter=None): +def get_image_analysis(workspace, repo, img_name, delete_all=False, delete_tag=None, detailed=False, progress=None, include_all=False, query_filter=None, fetch_vulns=False): # Helper to handle pagination def fetch_all_pages(start_url): results = [] @@ -763,27 +958,21 @@ def fetch_all_pages(start_url): if isinstance(data, list): results.extend(data) - # Check Link header for next page - # Format: ; rel="next", ; rel="last" next_url = None link_header = headers.get('Link') if link_header: for link in link_header.split(','): parts = link.split(';') if len(parts) >= 2 and 'rel="next"' in parts[1]: - # Extract URL from next_url = parts[0].strip().strip('<>') break return results - # Fetch all tags (including untagged if requested, but logic handled separately) - query_parts = [f"name:{img_name}"] if query_filter: query_parts.append(query_filter) full_query = " AND ".join(query_parts) - # Add page_size parameter qs = urlencode({'query': full_query, 'page_size': 100}) api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?{qs}" @@ -791,7 +980,6 @@ def fetch_all_pages(start_url): download_criteria = [] - # Retry logic: If no results directly, try removing download constraints and filtering client-side if not packages and query_filter and 'downloads' in query_filter: stripped_filter, criteria = parse_filter_criteria(query_filter) if criteria: @@ -811,30 +999,24 @@ def fetch_all_pages(start_url): logger.info(f"No packages found for image: {img_name}") return None - # Extract tags and matched digests (targets) tags = set() matched_digests = set() for pkg in packages: - # Collect tags pkg_tags = pkg.get('tags', {}).get('version', []) for t in pkg_tags: tags.add(t) - # Collect digests (often the 'version' field in API for images) v = pkg.get('version') if v: - # Normalize to sha256: for consistent strings if not v.startswith('sha256:') and len(v) == 64: matched_digests.add(f"sha256:{v}") else: matched_digests.add(v) - # If filter matched packages (e.g. children) but no direct tags were found, we need to find their parents. if packages and not tags: logger.info(f"Filter matched {len(packages)} objects but no direct tags. Performing parent lookup...") - # Search for ALL manifest lists for this image to identify parents of the matched digests all_ml_query = urlencode({'query': f"name:{img_name} AND NOT architecture:**", 'page_size': 100}) api_ml_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?{all_ml_query}" all_manifests = fetch_all_pages(api_ml_url) @@ -853,7 +1035,6 @@ def fetch_all_pages(start_url): groups = [] - # Create filtering set (if valid filter) filtering_set = matched_digests if query_filter else None task_id = None @@ -861,8 +1042,7 @@ def fetch_all_pages(start_url): task_id = progress.add_task(f"[cyan]Analyzing {img_name}[/cyan] ({len(sorted_tags)} tags)", total=len(sorted_tags)) with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - # Pass filtering_set to fetch_tag_data - future_to_tag = {executor.submit(fetch_tag_data, workspace, repo, img_name, t, detailed, include_all, filtering_set): t for t in sorted_tags} + future_to_tag = {executor.submit(fetch_tag_data, workspace, repo, img_name, t, detailed, include_all, filtering_set, fetch_vulns=fetch_vulns): t for t in sorted_tags} results = {} for future in concurrent.futures.as_completed(future_to_tag): @@ -876,7 +1056,6 @@ def fetch_all_pages(start_url): if progress and task_id is not None: progress.advance(task_id) - # Group by digest digest_map = {} for t in sorted_tags: @@ -886,12 +1065,10 @@ def fetch_all_pages(start_url): digest = parent.get('digest') if digest in digest_map: - # Append tag to existing entry existing_group = digest_map[digest] existing_parent = existing_group[0] existing_parent['tag'] = f"{existing_parent['tag']}, {t}" else: - # New entry digest_map[digest] = group groups = list(digest_map.values()) @@ -905,7 +1082,6 @@ def fetch_all_pages(start_url): if not group: continue parent = group[0] - # Modified to allow deletion of both manifest lists AND single images if parent.get('type') in ['manifest/list', 'image']: should_delete = False tags = parent.get('tag', '').split(', ') @@ -927,7 +1103,6 @@ def fetch_all_pages(start_url): if packages_to_delete: deleted_slugs, failed_slugs = batch_delete_packages(workspace, repo, packages_to_delete) - # Update Action Status in Groups for group in groups: if not group: continue parent = group[0] @@ -941,7 +1116,6 @@ def fetch_all_pages(start_url): if action_str: parent['action'] = action_str - # Optionally propagate to children if needed, but usually just parent row for row in group: if isinstance(row, dict): row['action'] = action_str @@ -963,23 +1137,21 @@ def process_image(org, repo, img_name, args, progress=None): # 1. Untagged Analysis if args.untagged or args.untagged_delete: - untagged_groups = get_untagged_images(org, repo, img_name, delete=args.untagged_delete, detailed=args.detailed, progress=progress) + untagged_groups = get_untagged_images(org, repo, img_name, delete=args.untagged_delete, detailed=args.detailed, progress=progress, fetch_vulns=args.vulnerabilities) if untagged_groups: results.extend(untagged_groups) # 2. Standard/Tagged Analysis - # Run if: - # - No untagged flags are set (Default behavior) should_run_standard = (not (args.untagged or args.untagged_delete)) if should_run_standard: - tagged_groups = get_image_analysis(org, repo, img_name, delete_all=args.delete_all, delete_tag=args.delete_tag, detailed=args.detailed, progress=progress, query_filter=args.filter) + tagged_groups = get_image_analysis(org, repo, img_name, delete_all=args.delete_all, delete_tag=args.delete_tag, detailed=args.detailed, progress=progress, query_filter=args.filter, fetch_vulns=args.vulnerabilities) if tagged_groups: results.extend(tagged_groups) return results -def render_table(image_name, groups, is_untagged=False, has_action=False): +def render_table(image_name, groups, is_untagged=False, has_action=False, show_vulns=False): # --- Table Setup --- table = Table(title=f"{'Untagged' if is_untagged else 'Tagged'} Image Analysis: {image_name}", box=box.ROUNDED) table.add_column("Tag", style="cyan") @@ -989,20 +1161,40 @@ def render_table(image_name, groups, is_untagged=False, has_action=False): table.add_column("Uploaded") table.add_column("Downloads", justify="right") table.add_column("Digest", style="dim") + if show_vulns: + table.add_column("Vuln Scan") + table.add_column("C", justify="right", style="bold red") + table.add_column("H", justify="right", style="red") + table.add_column("M", justify="right", style="yellow") + table.add_column("L", justify="right", style="green") if has_action: table.add_column("Action", style="bold red") # --- Row Rendering --- + def vuln_cells(row): + """Returns vulnerability cells for a row.""" + vuln_status = format_vuln_status(row.get("vuln_status")) + critical = row.get("critical", 0) + high = row.get("high", 0) + medium = row.get("medium", 0) + low = row.get("low", 0) + + # Highlight non-zero counts + c_str = f"[bold red]{critical}[/bold red]" if critical > 0 else f"[dim]{critical}[/dim]" + h_str = f"[red]{high}[/red]" if high > 0 else f"[dim]{high}[/dim]" + m_str = f"[yellow]{medium}[/yellow]" if medium > 0 else f"[dim]{medium}[/dim]" + l_str = f"[green]{low}[/green]" if low > 0 else f"[dim]{low}[/dim]" + + return [vuln_status, c_str, h_str, m_str, l_str] + for group in groups: if not group: continue parent = group[0] - # Action string for delete status action_str = parent.get('action', "") - # Parent Row if is_untagged: - table.add_row( + row_data = [ parent.get("tag", ""), parent.get("type", ""), parent.get("platform", ""), @@ -1010,8 +1202,12 @@ def render_table(image_name, groups, is_untagged=False, has_action=False): parent.get("uploaded", ""), f"[bold cyan]{parent.get('downloads', 0)}[/bold cyan]", f"[dim]{parent.get('digest', '')}[/dim]", - action_str if has_action else None - ) + ] + if show_vulns: + row_data.extend(vuln_cells(parent)) + if has_action: + row_data.append(action_str) + table.add_row(*row_data) else: row_data = [ f"[bold cyan]{parent.get('tag', '')}[/bold cyan]", @@ -1022,6 +1218,8 @@ def render_table(image_name, groups, is_untagged=False, has_action=False): f"[bold cyan]{parent.get('downloads', 0)}[/bold cyan]", f"[bold cyan]{parent.get('digest', '')}[/bold cyan]" ] + if show_vulns: + row_data.extend(vuln_cells(parent)) if has_action: row_data.append(action_str) @@ -1042,6 +1240,8 @@ def render_table(image_name, groups, is_untagged=False, has_action=False): f"[dim]{row.get('downloads', 0)}[/dim]", f"[dim]{row.get('digest', '')}[/dim]" ] + if show_vulns: + row_data.extend(vuln_cells(row)) if has_action: row_data.append(row.get('action', '')) @@ -1051,7 +1251,6 @@ def render_table(image_name, groups, is_untagged=False, has_action=False): def main(): - # Parse args first to configure logging parser = argparse.ArgumentParser(description="Sonar - A Docker image inspector for Cloudsmith.") parser.add_argument("org", help="Cloudsmith Organization/User") parser.add_argument("repo", help="Cloudsmith Repository") @@ -1062,6 +1261,7 @@ def main(): parser.add_argument("--delete-tag", help="Delete manifest lists matching this specific tag") parser.add_argument("--filter", help="Filter packages using Cloudsmith search syntax (e.g. 'version:^1.0' or 'tag:latest')") parser.add_argument("--detailed", action="store_true", help="Show detailed breakdown of digests") + parser.add_argument("--vulnerabilities", action="store_true", help="Show vulnerability scan status and CVE severity summary") parser.add_argument("--output", choices=['table', 'json'], default='table', help="Output format (default: table)") parser.add_argument("--debug-log", action="store_true", help="Enable debug logging to file") parser.add_argument("--force", action="store_true", help="Force deletion without interactive prompt") @@ -1119,7 +1319,6 @@ def fetch_paginated_results(start_url, is_catalog=False): next_url = None if is_catalog: - # Docker Registry API (_catalog) only uses Link headers link_header = headers.get('Link') if link_header: for link in link_header.split(','): @@ -1128,7 +1327,6 @@ def fetch_paginated_results(start_url, is_catalog=False): next_url = parts[0].strip().strip('<>') break else: - # Cloudsmith API v1 uses X-Pagination headers x_page = headers.get('X-Pagination-Page') x_total = headers.get('X-Pagination-Total-Pages') @@ -1138,7 +1336,6 @@ def fetch_paginated_results(start_url, is_catalog=False): total_pages = int(x_total) if current_page < total_pages: - # Construct next page URL using start_url as base to avoid drift scheme, netloc, path, params, query, fragment = urlparse(start_url) query_params = parse_qs(query) query_params['page'] = [str(current_page + 1)] @@ -1148,7 +1345,6 @@ def fetch_paginated_results(start_url, is_catalog=False): except ValueError: pass - # Fallback to Link header if X-Pagination headers are missing if not next_url: link_header = headers.get('Link') if link_header: @@ -1166,8 +1362,6 @@ def fetch_paginated_results(start_url, is_catalog=False): console.print(f"[bold]Searching for packages matching filter: '{args.filter}'...[/bold]") logger.info(f"Searching for packages matching filter: {args.filter}") - # Search for packages to identify which images to scan - # We append format:docker to ensure we only get docker images search_query = f"{args.filter} AND format:docker" full_query = urlencode({'query': search_query, 'page_size': 100}) search_url = f"https://api.cloudsmith.io/v1/packages/{args.org}/{args.repo}/?{full_query}" @@ -1196,7 +1390,6 @@ def fetch_paginated_results(start_url, is_catalog=False): if args.output == 'table': console.print(f"[yellow]{msg}[/yellow]") logger.info(msg) - # Exit cleanly if nothing found sys.exit(0) else: if args.output == 'table': @@ -1226,7 +1419,6 @@ def fetch_paginated_results(start_url, is_catalog=False): console=console ) else: - # Dummy context manager for non-table output class DummyProgress: def __enter__(self): return self def __exit__(self, *args): pass @@ -1234,7 +1426,7 @@ def add_task(self, *args, **kwargs): return None def advance(self, *args, **kwargs): pass def remove_task(self, *args, **kwargs): pass @property - def console(self): return console # fallback + def console(self): return console progress_ctx = DummyProgress() collected_results = [] @@ -1243,7 +1435,6 @@ def console(self): return console # fallback if args.output == 'table': task = progress.add_task(f"Processing {len(images_to_scan)} images...", total=len(images_to_scan)) - # Use a reasonable number of workers for images (e.g., 5) executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) try: future_to_img = { @@ -1265,15 +1456,12 @@ def console(self): return console # fallback if args.output == 'table': progress.advance(task) - # Normal shutdown executor.shutdown(wait=True) except KeyboardInterrupt: - # Force shutdown without waiting executor.shutdown(wait=False, cancel_futures=True) raise - # Sort results by image name and print collected_results.sort(key=lambda x: x[0]) if not collected_results: @@ -1290,16 +1478,20 @@ def console(self): return console # fallback for img_name, groups in collected_results: is_untagged = args.untagged or args.untagged_delete - # If showing both, prefer the standard table view if is_untagged: is_untagged = False has_action = args.untagged_delete or args.delete_all or (args.delete_tag is not None) - table = render_table(image_name=img_name, groups=groups, is_untagged=is_untagged, has_action=has_action) + table = render_table( + image_name=img_name, + groups=groups, + is_untagged=is_untagged, + has_action=has_action, + show_vulns=args.vulnerabilities, + ) console.print(table) console.print("") elif args.output == 'json': - # JSON Output for all images all_results = {} for img_name, groups in collected_results: all_results[img_name] = groups @@ -1307,16 +1499,18 @@ def console(self): return console # fallback json_output = json.dumps(all_results, indent=2) console.print(json_output) elif args.output == 'csv': - # CSV Output (simple flat structure) csv_lines = [] - csv_lines.append(["Image", "Tag", "Type", "Platform", "Status", "Uploaded", "Downloads", "Digest", "Action"]) # Header + header = ["Image", "Tag", "Type", "Platform", "Status", "Uploaded", "Downloads", "Digest"] + if args.vulnerabilities: + header.extend(["Vuln Status", "Critical", "High", "Medium", "Low"]) + header.append("Action") + csv_lines.append(header) for img_name, groups in collected_results: for group in groups: if group == "SECTION": continue - # Flat CSV row - csv_lines.append([ + row = [ img_name, group.get("tag", ""), group.get("type", ""), @@ -1325,10 +1519,18 @@ def console(self): return console # fallback group.get("uploaded", ""), str(group.get("downloads", 0)), group.get("digest", ""), - group.get("action", "") - ]) + ] + if args.vulnerabilities: + row.extend([ + group.get("vuln_status", ""), + str(group.get("critical", 0)), + str(group.get("high", 0)), + str(group.get("medium", 0)), + str(group.get("low", 0)), + ]) + row.append(group.get("action", "")) + csv_lines.append(row) - # Print CSV for line in csv_lines: console.print(",".join(f'"{str(item)}"' for item in line)) @@ -1337,5 +1539,4 @@ def console(self): return console # fallback main() except KeyboardInterrupt: console.print("\n[bold red]Operation cancelled by user.[/bold red]") - # Use os._exit to avoid hanging on shutdown os._exit(0) \ No newline at end of file From a26e93b6e8d60ba29f4aabe9429c176d9bc5b098 Mon Sep 17 00:00:00 2001 From: colinmoynes Date: Fri, 20 Mar 2026 15:05:00 +0000 Subject: [PATCH 2/2] changelog --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f5aa80..6d70afb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,14 @@ ## [Unreleased] +## [Sonar] [v1.3] [2026-03-20] + +### Added +- Added `vulnerabilities` flag — When set, fetches and displays vulnerability scan status and CVE severity summary (Critical, High, Medium, Low) for each package in the output. +- Rolls up child image vulnerability data into the parent manifest list. +- Rich-formatted display for scan status (Scanned, Not Supported, Scanning, etc.). +- Vulnerability fields included in all output formats. + ## [Sonar] [v1.2] [2026-01-14] ### Added