From b6d51409d5feffdd60a27071cf42ac8831fd9a02 Mon Sep 17 00:00:00 2001 From: Eli Dross Date: Mon, 18 Nov 2024 16:25:03 -0500 Subject: [PATCH 1/5] created helper_function.py --- README.md | 3 ++ .../anon_deactivated_users.py | 13 ++++--- archive-stale-scans/archive_stale_scans.py | 38 +++++++++--------- helper_functions.py | 32 +++++++++++++++ post-scan-gates/post_scan_gates.py | 39 ++++++++++--------- post-scan-reports/post_scan_reports.py | 36 ++++++++--------- quick-scan/quick_scan.py | 33 ++++++++-------- 7 files changed, 116 insertions(+), 78 deletions(-) create mode 100644 helper_functions.py diff --git a/README.md b/README.md index bf1c610..4150416 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,9 @@ This script downloads reports for a scan - useful if you want to include FossID ### Quickly Scan a File This script scans a single file using the Quick Scan API - helpful for quickly knowing if AI-generated code should be investigated further. +### Helper Functions +This script contains helper functions that are used in the example scripts + ## Contributing Contributions are welcome! We'll review any Pull Requests made. diff --git a/anon-deactivated-users/anon_deactivated_users.py b/anon-deactivated-users/anon_deactivated_users.py index 1f7b80a..67bab72 100644 --- a/anon-deactivated-users/anon_deactivated_users.py +++ b/anon-deactivated-users/anon_deactivated_users.py @@ -3,22 +3,23 @@ import logging import argparse import os +import helper_functions as hf # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Create a session object for making requests -session = requests.Session() +#session = requests.Session() # Helper function to make API calls -def make_api_call(url: str, payload: dict) -> dict: +"""def make_api_call(url: str, payload: dict) -> dict: try: response = session.post(url, json=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logging.error(f"API call failed: {e}") - raise + raise""" # Function to get all users def get_all_users(api_url, api_username, api_token): @@ -31,8 +32,8 @@ def get_all_users(api_url, api_username, api_token): "include_deactivated": "1" } } - logging.debug(make_api_call(api_url, payload)) - return make_api_call(api_url, payload) + logging.debug(hf.make_api_call(api_url, payload)) + return hf.make_api_call(api_url, payload) # Function to update user information def update_user(api_url, api_username, api_token, user_username, user_name, user_surname, user_email, user_password): @@ -49,7 +50,7 @@ def update_user(api_url, api_username, api_token, user_username, user_name, user "user_password": user_password } } - return make_api_call(api_url, payload) + return hf.make_api_call(api_url, payload) def main(api_base_url, api_username, api_token, dry_run): # Ensure the API URL ends with /api.php diff --git a/archive-stale-scans/archive_stale_scans.py b/archive-stale-scans/archive_stale_scans.py index d5c08f4..829f50d 100644 --- a/archive-stale-scans/archive_stale_scans.py +++ b/archive-stale-scans/archive_stale_scans.py @@ -13,7 +13,7 @@ import argparse import os from typing import List, Tuple, Dict, Any - +import helper_functions as hf import requests from tabulate import tabulate @@ -23,23 +23,23 @@ ) # Create a session object for making requests -session = requests.Session() +#session = requests.Session() -def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: - """Helper function to make API calls.""" - try: - logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) - response = session.post(url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise +#def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: +# """Helper function to make API calls.""" +# try: +# logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) +# response = requests.post(url, json=payload, timeout=10) +# response.raise_for_status() +# logging.debug("Received response: %s", response.text) +# return response.json().get("data", {}) +# except requests.exceptions.RequestException as e: +# logging.error("API call failed: %s", str(e)) +# raise +# except json.JSONDecodeError as e: +# logging.error("Failed to parse JSON response: %s", str(e)) +# raise def list_scans(url: str, username: str, token: str) -> Dict[str, Any]: @@ -49,7 +49,7 @@ def list_scans(url: str, username: str, token: str) -> Dict[str, Any]: "action": "list_scans", "data": {"username": username, "key": token}, } - return make_api_call(url, payload) + return hf.make_api_call(url, payload) def get_scan_info( @@ -61,7 +61,7 @@ def get_scan_info( "action": "get_information", "data": {"username": username, "key": token, "scan_code": scan_code}, } - return make_api_call(url, payload) + return hf.make_api_call(url, payload) def get_project_info( @@ -73,7 +73,7 @@ def get_project_info( "action": "get_information", "data": {"username": username, "key": token, "project_code": project_code}, } - return make_api_call(url, payload) + return hf.make_api_call(url, payload) def archive_scan(url: str, username: str, token: str, scan_code: str) -> bool: diff --git a/helper_functions.py b/helper_functions.py new file mode 100644 index 0000000..09981c0 --- /dev/null +++ b/helper_functions.py @@ -0,0 +1,32 @@ +import json +import time +import logging +import argparse +import os +import re +from typing import Dict, Any +import sys +import requests + +def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: + """Helper function to make API calls. + + Parameters: + url(str): The url to access the API. + payload(Dict[str, Any]): A dictionary that contains the data required by the API. + + Returns: + Returns a dicitionary with the API response data + """ + try: + logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) + response = requests.post(url, json=payload, timeout=10) + response.raise_for_status() + logging.debug("Received response: %s", response.text) + return response.json().get("data", {}) + except requests.exceptions.RequestException as e: + logging.error("API call failed: %s", str(e)) + raise + except json.JSONDecodeError as e: + logging.error("Failed to parse JSON response: %s", str(e)) + raise \ No newline at end of file diff --git a/post-scan-gates/post_scan_gates.py b/post-scan-gates/post_scan_gates.py index 51cc04f..5275a0c 100644 --- a/post-scan-gates/post_scan_gates.py +++ b/post-scan-gates/post_scan_gates.py @@ -20,6 +20,7 @@ from typing import Dict, Any import sys import requests +import helper_functions as hf # Constants API_ACTION_CHECK_STATUS = "check_status" @@ -32,23 +33,23 @@ ) # Create a session object for making requests -session = requests.Session() +#session = requests.Session() -def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: - """Helper function to make API calls.""" - try: - logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) - response = session.post(url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise +#def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: +# """Helper function to make API calls.""" +# try: +# logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) +# response = requests.post(url, json=payload, timeout=10) +# response.raise_for_status() +# logging.debug("Received response: %s", response.text) +# return response.json().get("data", {}) +# except requests.exceptions.RequestException as e: +# logging.error("API call failed: %s", str(e)) +# raise +# except json.JSONDecodeError as e: +# logging.error("Failed to parse JSON response: %s", str(e)) +# raise def check_scan_status( @@ -56,7 +57,7 @@ def check_scan_status( ) -> Dict[str, Any]: """Check the status of the scan.""" payload = create_payload(username, token, scan_code, API_ACTION_CHECK_STATUS) - return make_api_call(api_url, payload) + return hf.make_api_call(api_url, payload) def check_pending_identifications( @@ -64,7 +65,7 @@ def check_pending_identifications( ) -> Dict[str, Any]: """Check for pending identifications in the scan.""" payload = create_payload(username, token, scan_code, API_ACTION_GET_PENDING_FILES) - return make_api_call(api_url, payload) + return hf.make_api_call(api_url, payload) def check_policy_violations( @@ -72,7 +73,7 @@ def check_policy_violations( ) -> Dict[str, Any]: """Check for policy violations in the scan.""" payload = create_payload(username, token, scan_code, API_ACTION_GET_POLICY_WARNINGS) - return make_api_call(api_url, payload) + return hf.make_api_call(api_url, payload) def create_payload( @@ -177,7 +178,7 @@ def get_scan_information( ) -> Dict[str, Any]: """Get scan information from the API.""" payload = create_payload(username, token, scan_code, "get_information") - return make_api_call(api_url, payload) + return hf.make_api_call(api_url, payload) def main(): diff --git a/post-scan-reports/post_scan_reports.py b/post-scan-reports/post_scan_reports.py index 6d6f572..939c527 100644 --- a/post-scan-reports/post_scan_reports.py +++ b/post-scan-reports/post_scan_reports.py @@ -14,7 +14,7 @@ import argparse import os from typing import Dict, Any, Tuple - +import helper_functions as hf import requests # Configure logging @@ -23,7 +23,7 @@ ) # Create a session object for making requests -session = requests.Session() +#session = requests.Session() # List of all report types REPORT_TYPES = [ @@ -46,22 +46,22 @@ def __init__(self, url: str, username: str, token: str): self.username = username self.token = token - def make_api_call(self, payload: Dict[str, Any]) -> Dict[str, Any]: - """Helper function to make API calls.""" - try: - logging.debug( - "Making API call with payload: %s", json.dumps(payload, indent=2) - ) - response = session.post(self.url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise +# def make_api_call(self, payload: Dict[str, Any]) -> Dict[str, Any]: +# """Helper function to make API calls.""" +# try: +# logging.debug( +# "Making API call with payload: %s", json.dumps(payload, indent=2) +# ) +# response = session.post(self.url, json=payload, timeout=10) +# response.raise_for_status() +# logging.debug("Received response: %s", response.text) +# return response.json().get("data", {}) +# except requests.exceptions.RequestException as e: +# logging.error("API call failed: %s", str(e)) +# raise +# except json.JSONDecodeError as e: +# logging.error("Failed to parse JSON response: %s", str(e)) +# raise def check_scan_status(self, scan_code: str, process_id: str = None) -> Dict[str, Any]: """Check Workbench scan status.""" diff --git a/quick-scan/quick_scan.py b/quick-scan/quick_scan.py index 9a8fccf..b1e861d 100644 --- a/quick-scan/quick_scan.py +++ b/quick-scan/quick_scan.py @@ -12,6 +12,7 @@ import os from typing import Dict, Any import requests +import helper_functions as hf # Configure logging logging.basicConfig( @@ -19,23 +20,23 @@ ) # Create a session object for making requests -session = requests.Session() +#session = requests.Session() -def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: - """Helper function to make API calls.""" - try: - logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) - response = session.post(url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise +#def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: +# """Helper function to make API calls.""" +# try: +# logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) +# response = requests.post(url, json=payload, timeout=10) +# response.raise_for_status() +# logging.debug("Received response: %s", response.text) +# return response.json().get("data", {}) +# except requests.exceptions.RequestException as e: +# logging.error("API call failed: %s", str(e)) +# raise +# except json.JSONDecodeError as e: +# logging.error("Failed to parse JSON response: %s", str(e)) +# raise def quick_scan( @@ -53,7 +54,7 @@ def quick_scan( "sensitivity": "10", }, } - return make_api_call(api_url, payload) + return hf.make_api_call(api_url, payload) def format_scan_result(result_data: Dict[str, Any], quick_view_link: str) -> str: From 0516543c0ea0f3bff5c66dc60a9622a5e33c1a78 Mon Sep 17 00:00:00 2001 From: Eli Dross Date: Mon, 18 Nov 2024 16:26:49 -0500 Subject: [PATCH 2/5] fixed session error --- archive-stale-scans/archive_stale_scans.py | 2 +- post-scan-reports/post_scan_reports.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/archive-stale-scans/archive_stale_scans.py b/archive-stale-scans/archive_stale_scans.py index 829f50d..388a56e 100644 --- a/archive-stale-scans/archive_stale_scans.py +++ b/archive-stale-scans/archive_stale_scans.py @@ -84,7 +84,7 @@ def archive_scan(url: str, username: str, token: str, scan_code: str) -> bool: "data": {"username": username, "key": token, "scan_code": scan_code}, } try: - response = session.post(url, json=payload) + response = requests.post(url, json=payload) response.raise_for_status() return response.status_code == 200 except requests.exceptions.RequestException as e: diff --git a/post-scan-reports/post_scan_reports.py b/post-scan-reports/post_scan_reports.py index 939c527..c93a663 100644 --- a/post-scan-reports/post_scan_reports.py +++ b/post-scan-reports/post_scan_reports.py @@ -131,7 +131,7 @@ def download_report( "process_id": process_queue_id, }, } - response = session.post(self.url, json=payload, timeout=120) + response = requests.post(self.url, json=payload, timeout=120) response.raise_for_status() file_extension = { From 0cf04b60eea53519314d09fd4a5315265fc1fe31 Mon Sep 17 00:00:00 2001 From: Eli Dross Date: Wed, 20 Nov 2024 14:01:51 -0500 Subject: [PATCH 3/5] worked on code for anon and archive --- README.md | 4 + .../anon_deactivated_users.py | 69 ++++--- archive-stale-scans/archive_stale_scans.py | 170 ++++++++++++------ post-scan-gates/post_scan_gates.py | 34 ++-- post-scan-reports/post_scan_reports.py | 33 ++-- quick-scan/quick_scan.py | 34 ++-- 6 files changed, 210 insertions(+), 134 deletions(-) diff --git a/README.md b/README.md index 4150416..9748abc 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,10 @@ These scripts were built by the FossID Customer Experience teams in collaboratio These examples demonstrate how to interact with the Workbench API. We do our best to keep the examples updated, but there is no long-term maintainer for this code. We do not use GitHub Issues - for questions or issues with the scripts please use the [FossID Support Portal](https://support.fossid.com/). Thank you! ## Example Scripts +Each script interacts with the FOSSID workbench API. To use our API a payload dictionary is required for each API call. +Each different API call has a different payload with different requirments. To see these requirements, you can look in the +scripts to see each required key for the payload or view it on API website. + The repo has scripts that help you: ### Archive Old Scans diff --git a/anon-deactivated-users/anon_deactivated_users.py b/anon-deactivated-users/anon_deactivated_users.py index 67bab72..5cc3eb8 100644 --- a/anon-deactivated-users/anon_deactivated_users.py +++ b/anon-deactivated-users/anon_deactivated_users.py @@ -8,41 +8,51 @@ # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -# Create a session object for making requests -#session = requests.Session() +def get_all_users(url, username, token): + """ Gets all users in the system -# Helper function to make API calls -"""def make_api_call(url: str, payload: dict) -> dict: - try: - response = session.post(url, json=payload) - response.raise_for_status() - return response.json() - except requests.exceptions.RequestException as e: - logging.error(f"API call failed: {e}") - raise""" - -# Function to get all users -def get_all_users(api_url, api_username, api_token): + Parameters: + url(str): The link to acess the api. + username(str): The username to use to access the api. + token(str): The token required to access the api. + + Returns: + A dictionary with every user. + """ payload = { "group": "users", "action": "get_all_users", "data": { - "username": api_username, - "key": api_token, + "username": username, + "key": token, "include_deactivated": "1" } } - logging.debug(hf.make_api_call(api_url, payload)) - return hf.make_api_call(api_url, payload) - -# Function to update user information -def update_user(api_url, api_username, api_token, user_username, user_name, user_surname, user_email, user_password): + logging.debug(hf.make_api_call(url, payload)) + return hf.make_api_call(url, payload) + +def update_user(url, username, token, user_username, user_name, user_surname, user_email, user_password): + """Updates the user information based on the given data from the parameters. + + Parameters: + url(str): The link to access to the api. + username(str): The username to use to access the api. + token(str): The token required to access the api. + user_username(str): The username of the user to update. + user_name(str): The updated name for the username. + user_surname(str): The updated surname for the username. + user_email(str): The updated email for the username. + user_password(str): The updated password for the username. + + Returns: + A dictionary with updated data on the user. + """ payload = { "group": "users", "action": "update", "data": { - "username": api_username, - "key": api_token, + "username": username, + "key": token, "user_username": user_username, "user_name": user_name, "user_surname": user_surname, @@ -50,9 +60,17 @@ def update_user(api_url, api_username, api_token, user_username, user_name, user "user_password": user_password } } - return hf.make_api_call(api_url, payload) + return hf.make_api_call(url, payload) def main(api_base_url, api_username, api_token, dry_run): + """The main functionality of the script. Anonimizes users who have been deactivated + + Parameters: + api_base_url(str): The base url for the FOSSID workbench. + api_username(str): The username to use to access the api. + api_token(str): The required token to access the api. + dry_run(boolean): True if a dry run was requested. + """ # Ensure the API URL ends with /api.php if not api_base_url.endswith('/api.php'): api_url = api_base_url.rstrip('/') + '/api.php' @@ -81,6 +99,7 @@ def main(api_base_url, api_username, api_token, dry_run): "updated_username": updated_username }) + #Performes the dry run and then returns before anonimizing any users. if dry_run: if users_to_update: logging.info("Dry Run enabled! The following users would be updated:") @@ -118,6 +137,7 @@ def main(api_base_url, api_username, api_token, dry_run): except Exception as e: logging.error(f"An error occurred: {e}") +#sets up the arugments to run the script if __name__ == "__main__": parser = argparse.ArgumentParser(description='Anonymize deactivated users in Workbench.') parser.add_argument('--workbench-url', type=str, help='The Workbench API URL') @@ -135,4 +155,5 @@ def main(api_base_url, api_username, api_token, dry_run): logging.error("The Workbench URL, username, and token must be provided either as arguments or environment variables.") exit(1) + #calls the main function to run the script and anonimize deactivited users main(api_base_url, api_username, api_token, args.dry_run) \ No newline at end of file diff --git a/archive-stale-scans/archive_stale_scans.py b/archive-stale-scans/archive_stale_scans.py index 388a56e..deb49bd 100644 --- a/archive-stale-scans/archive_stale_scans.py +++ b/archive-stale-scans/archive_stale_scans.py @@ -22,28 +22,16 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -# Create a session object for making requests -#session = requests.Session() - - -#def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: -# """Helper function to make API calls.""" -# try: -# logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) -# response = requests.post(url, json=payload, timeout=10) -# response.raise_for_status() -# logging.debug("Received response: %s", response.text) -# return response.json().get("data", {}) -# except requests.exceptions.RequestException as e: -# logging.error("API call failed: %s", str(e)) -# raise -# except json.JSONDecodeError as e: -# logging.error("Failed to parse JSON response: %s", str(e)) -# raise - - def list_scans(url: str, username: str, token: str) -> Dict[str, Any]: - """List all scans.""" + """List all scans. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + Returns: + A dicitionary with each scan. + """ payload = { "group": "scans", "action": "list_scans", @@ -52,10 +40,18 @@ def list_scans(url: str, username: str, token: str) -> Dict[str, Any]: return hf.make_api_call(url, payload) -def get_scan_info( - url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Get scan info for each scan.""" +def get_scan_info(url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: + """Get scan info for each scan. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code for the scan to get the info on. + + Returns: + A dicitonary with the info on the given scan code. + """ payload = { "group": "scans", "action": "get_information", @@ -64,10 +60,18 @@ def get_scan_info( return hf.make_api_call(url, payload) -def get_project_info( - url: str, username: str, token: str, project_code: str -) -> Dict[str, Any]: - """Get the project name for each scan's project code.""" +def get_project_info(url: str, username: str, token: str, project_code: str) -> Dict[str, Any]: + """Get the project name for each scan's project code. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + project_code(str): The code for the project to get the info on. + + Returns: + A dicitionary with the data on the given project. + """ payload = { "group": "projects", "action": "get_information", @@ -77,7 +81,17 @@ def get_project_info( def archive_scan(url: str, username: str, token: str, scan_code: str) -> bool: - """Archive a scan.""" + """Archives a scan. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code for the scan to get the info on. + + Returns: + Boolean; True if the scan was succesfully archived, false otherwise. + """ payload = { "group": "scans", "action": "archive_scan", @@ -92,22 +106,39 @@ def archive_scan(url: str, username: str, token: str, scan_code: str) -> bool: return False -def find_old_scans( - scans: Dict[str, Any], url: str, username: str, token: str, days: int -) -> List[Tuple[str, str, str, datetime, datetime]]: - """Find scans that were last updated before the specified days.""" +def find_old_scans(scans: Dict[str, Any], url: str, username: str, token: str, days: int) -> List[Tuple[str, str, str, datetime, datetime]]: + """Find scans that were last updated before the specified days. + + Parameters: + scans(dict[str, any]): A dicitonary containing the scans + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + days(int): The amount of days passed for a scan to be old. + + Returns: + A list of tuples that contains that have not been updated since the given + amount of days. + """ + old_scans = [] time_limit = datetime.now() - timedelta(days=days) + for scan_info in scans.values(): + scan_code = scan_info["code"] scan_details = get_scan_info(url, username, token, scan_code) + if scan_details["is_archived"]: continue + creation_date = datetime.strptime(scan_details["created"], "%Y-%m-%d %H:%M:%S") update_date = datetime.strptime(scan_details["updated"], "%Y-%m-%d %H:%M:%S") + if update_date < time_limit: project_code = scan_details.get("project_code") project_name = "No Project" + if project_code: project_info = get_project_info(url, username, token, project_code) project_name = project_info.get("project_name", "Unknown Project") @@ -120,27 +151,41 @@ def find_old_scans( update_date, ) ) + return old_scans def display_scans(scans: List[Tuple[str, str, str, datetime, datetime]], dry_run: bool): - """Display scans that would be archived.""" + """Display scans that would be archived. + + Parameters: + scans(List[Tuple[str, str, str, datetime, datetime]]): The scans to check through. + dry_run(boolean): True if the run should be a dry run, false otherwise. + """ if dry_run: logging.info("Dry Run enabled! These scans would be archived:") else: logging.info("These scans will be archived:") - headers = ["PROJECT NAME", "SCAN NAME", "SCAN AGE (days)", "LAST MODIFIED"] - table = [ - [project_name, scan_name, (datetime.now() - update_date).days, update_date] - for project_name, scan_name, _, _, update_date in scans - ] - print(tabulate(table, headers, tablefmt="fancy_grid")) - - -def fetch_and_find_old_scans( - url: str, username: str, token: str, days: int -) -> List[Tuple[str, str, str, datetime, datetime]]: - """Fetch scans and find the ones that are older than the specified number of days.""" + headers = ["PROJECT NAME", "SCAN NAME", "SCAN AGE (days)", "LAST MODIFIED"] + table = [ + [project_name, scan_name, (datetime.now() - update_date).days, update_date] + for project_name, scan_name, _, _, update_date in scans + ] + print(tabulate(table, headers, tablefmt="fancy_grid")) + + +def fetch_and_find_old_scans(url: str, username: str, token: str, days: int) -> List[Tuple[str, str, str, datetime, datetime]]: + """Fetch scans and find the ones that are older than the specified number of days. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + days(int): The amount of days passed for a scan to be old + + Returns: + A list of tuples of the old scans. + """ logging.info("Fetching scans from Workbench...") try: scans = list_scans(url, username, token) @@ -152,14 +197,17 @@ def fetch_and_find_old_scans( return find_old_scans(scans, url, username, token, days) -def archive_scans( - url: str, - username: str, - token: str, - scans: List[Tuple[str, str, str, datetime, datetime]], -): - """Archive the specified scans.""" +def archive_scans(url: str, username: str, token: str, scans: List[Tuple[str, str, str, datetime, datetime]]): + """Archive the specified scans. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scans(List[Tuple[str, str, str, datetime, datetime]]): The scans to archive. + """ for project_name, scan_name, scan_code, _, _ in scans: + logging.info("Archiving scan: %s (%s)", scan_name, project_name) if archive_scan(url, username, token, scan_code): logging.info("Archived scan: %s", scan_name) @@ -168,7 +216,16 @@ def archive_scans( def main(url: str, username: str, token: str, days: int, dry_run: bool): - """Main function to archive old scans.""" + """Main function to archive old scans. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + days(int): The required amount of days passed for a scan to be considered old. + dry_run(boolean): True if dry run was enabled. + """ + old_scans = fetch_and_find_old_scans(url, username, token, days) if not old_scans: logging.info("No scans were last updated more than %d days ago. Exiting.", days) @@ -186,7 +243,7 @@ def main(url: str, username: str, token: str, days: int, dry_run: bool): archive_scans(url, username, token, old_scans) - +#sets up the arguments if __name__ == "__main__": parser = argparse.ArgumentParser(description="Archive old scans.") parser.add_argument("--workbench-url", type=str, help="The Workbench API URL") @@ -221,4 +278,5 @@ def main(url: str, username: str, token: str, days: int, dry_run: bool): if not api_url.endswith("/api.php"): api_url += "/api.php" + #calls the main function to acrive the stale scans main(api_url, api_username, api_token, args.days, args.dry_run) diff --git a/post-scan-gates/post_scan_gates.py b/post-scan-gates/post_scan_gates.py index 5275a0c..6814b95 100644 --- a/post-scan-gates/post_scan_gates.py +++ b/post-scan-gates/post_scan_gates.py @@ -32,25 +32,21 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -# Create a session object for making requests -#session = requests.Session() - - -#def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: -# """Helper function to make API calls.""" -# try: -# logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) -# response = requests.post(url, json=payload, timeout=10) -# response.raise_for_status() -# logging.debug("Received response: %s", response.text) -# return response.json().get("data", {}) -# except requests.exceptions.RequestException as e: -# logging.error("API call failed: %s", str(e)) -# raise -# except json.JSONDecodeError as e: -# logging.error("Failed to parse JSON response: %s", str(e)) -# raise - +#Helper function to make API calls +"""def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: + try: + logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) + response = requests.post(url, json=payload, timeout=10) + response.raise_for_status() + logging.debug("Received response: %s", response.text) + return response.json().get("data", {}) + except requests.exceptions.RequestException as e: + logging.error("API call failed: %s", str(e)) + raise + except json.JSONDecodeError as e: + logging.error("Failed to parse JSON response: %s", str(e)) + raise +""" def check_scan_status( api_url: str, username: str, token: str, scan_code: str diff --git a/post-scan-reports/post_scan_reports.py b/post-scan-reports/post_scan_reports.py index c93a663..fcecc2b 100644 --- a/post-scan-reports/post_scan_reports.py +++ b/post-scan-reports/post_scan_reports.py @@ -46,22 +46,23 @@ def __init__(self, url: str, username: str, token: str): self.username = username self.token = token -# def make_api_call(self, payload: Dict[str, Any]) -> Dict[str, Any]: -# """Helper function to make API calls.""" -# try: -# logging.debug( -# "Making API call with payload: %s", json.dumps(payload, indent=2) -# ) -# response = session.post(self.url, json=payload, timeout=10) -# response.raise_for_status() -# logging.debug("Received response: %s", response.text) -# return response.json().get("data", {}) -# except requests.exceptions.RequestException as e: -# logging.error("API call failed: %s", str(e)) -# raise -# except json.JSONDecodeError as e: -# logging.error("Failed to parse JSON response: %s", str(e)) -# raise + #Helper function to make API calls. + """def make_api_call(self, payload: Dict[str, Any]) -> Dict[str, Any]: + try: + logging.debug( + "Making API call with payload: %s", json.dumps(payload, indent=2) + ) + response = session.post(self.url, json=payload, timeout=10) + response.raise_for_status() + logging.debug("Received response: %s", response.text) + return response.json().get("data", {}) + except requests.exceptions.RequestException as e: + logging.error("API call failed: %s", str(e)) + raise + except json.JSONDecodeError as e: + logging.error("Failed to parse JSON response: %s", str(e)) + raise +""" def check_scan_status(self, scan_code: str, process_id: str = None) -> Dict[str, Any]: """Check Workbench scan status.""" diff --git a/quick-scan/quick_scan.py b/quick-scan/quick_scan.py index b1e861d..4c91150 100644 --- a/quick-scan/quick_scan.py +++ b/quick-scan/quick_scan.py @@ -19,25 +19,21 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -# Create a session object for making requests -#session = requests.Session() - - -#def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: -# """Helper function to make API calls.""" -# try: -# logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) -# response = requests.post(url, json=payload, timeout=10) -# response.raise_for_status() -# logging.debug("Received response: %s", response.text) -# return response.json().get("data", {}) -# except requests.exceptions.RequestException as e: -# logging.error("API call failed: %s", str(e)) -# raise -# except json.JSONDecodeError as e: -# logging.error("Failed to parse JSON response: %s", str(e)) -# raise - +#Helper function to make API calls. +"""def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: + try: + logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) + response = requests.post(url, json=payload, timeout=10) + response.raise_for_status() + logging.debug("Received response: %s", response.text) + return response.json().get("data", {}) + except requests.exceptions.RequestException as e: + logging.error("API call failed: %s", str(e)) + raise + except json.JSONDecodeError as e: + logging.error("Failed to parse JSON response: %s", str(e)) + raise +""" def quick_scan( api_url: str, api_user: str, api_key: str, file_content: str From 2db16acdc2a3239cd2c8644bf8476323a73b5377 Mon Sep 17 00:00:00 2001 From: Eli Dross Date: Fri, 13 Dec 2024 16:15:13 -0500 Subject: [PATCH 4/5] Cleaned up API workbench code and added comments. --- post-scan-gates/post_scan_gates.py | 158 +++++++++++++++++-------- post-scan-reports/post_scan_reports.py | 113 ++++++++++++------ quick-scan/quick_scan.py | 62 ++++++---- 3 files changed, 223 insertions(+), 110 deletions(-) diff --git a/post-scan-gates/post_scan_gates.py b/post-scan-gates/post_scan_gates.py index 6814b95..5b26e98 100644 --- a/post-scan-gates/post_scan_gates.py +++ b/post-scan-gates/post_scan_gates.py @@ -32,50 +32,65 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -#Helper function to make API calls -"""def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: - try: - logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) - response = requests.post(url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise -""" - -def check_scan_status( - api_url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Check the status of the scan.""" +def check_scan_status(api_url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: + """Checks the status of the scan. + + Parameters: + api_url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code to identify the scan to check. + + Returns: + The response from the scan api call. + """ payload = create_payload(username, token, scan_code, API_ACTION_CHECK_STATUS) return hf.make_api_call(api_url, payload) -def check_pending_identifications( - api_url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Check for pending identifications in the scan.""" +def check_pending_identifications(api_url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: + """Checks for pending identifications in the scan. + + Parameters: + api_url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code to identify the scan to check. + + Returns: + The response from the api call. + """ payload = create_payload(username, token, scan_code, API_ACTION_GET_PENDING_FILES) return hf.make_api_call(api_url, payload) -def check_policy_violations( - api_url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Check for policy violations in the scan.""" +def check_policy_violations(api_url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: + """Checks for policy violations in the scan. + + Parameters: + api_url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code to identify the scan to check. + + Returns: + The response from the api call.""" payload = create_payload(username, token, scan_code, API_ACTION_GET_POLICY_WARNINGS) return hf.make_api_call(api_url, payload) -def create_payload( - username: str, token: str, scan_code: str, action: str -) -> Dict[str, Any]: - """Create payload for API calls.""" +def create_payload(username: str, token: str, scan_code: str, action: str) -> Dict[str, Any]: + """Create payload for API calls. + + Parameters: + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code to identify the scan to check. + action(str): The given action to input into the payload for the api to do. + + Returns: + The payload dictionary to use. + """ return { "group": "scans", "action": action, @@ -84,14 +99,29 @@ def create_payload( def validate_and_get_api_url(url: str) -> str: - """Validate and construct the API URL.""" + """Validate and construct the API URL. + + Parameters: + url(str): The url to access the api. + + Returns: + The validated URL + """ if not url.endswith("/api.php"): return url.rstrip("/") + "/api.php" return url def generate_links(base_url: str, scan_id: int) -> Dict[str, str]: - """Generate links for scan results.""" + """Generate links for scan results. + + Parameters: + base_url(str): The url to access the api. + scan_id(int): the ID of the scan to create links for. + + Returns: + A dictionary containing all the links of scan results. + """ return { "scan_link": ( f"{base_url}/index.html?form=main_interface&action=scanview&sid={scan_id}" @@ -105,11 +135,16 @@ def generate_links(base_url: str, scan_id: int) -> Dict[str, str]: def wait_for_scan_completion(api_url: str, config: Dict[str, Any]) -> None: - """Wait for the scan to complete.""" + """Wait for the scan to complete. + + Parameters: + api_url(str): The url to access the api. + config(Dict[str, Any]): contains the username, token and scan coded needed to check the intended scan. + """ + logging.info("Checking Scan Status...") - scan_status = check_scan_status( - api_url, config["username"], config["token"], config["scan_code"] - ) + scan_status = check_scan_status(api_url, config["username"], config["token"], config["scan_code"]) + while scan_status.get("status") != "FINISHED": logging.info( "Scan status: %s, waiting to complete...", @@ -122,14 +157,20 @@ def wait_for_scan_completion(api_url: str, config: Dict[str, Any]) -> None: logging.info("The Scan completed!") -def check_pending_files( - api_url: str, config: Dict[str, Any], links: Dict[str, str] -) -> None: - """Check for pending files and exit if any are found.""" +def check_pending_files(api_url: str, config: Dict[str, Any], links: Dict[str, str]) -> None: + """Check for pending files and exit if any are found. + + Parameters: + api_url(str): The url to access the api. + config(Dict[str, Any]): contains the username, token and scan coded needed to check the intended scan. + links(Dict[str, str]): Contains the links to use to get scan results + """ + logging.info("Checking if any files have Pending Identifications...") pending_files = check_pending_identifications( api_url, config["username"], config["token"], config["scan_code"] ) + if pending_files: file_names = list(pending_files.values()) if file_names: @@ -138,20 +179,32 @@ def check_pending_files( if config["show_files"]: logging.info("Pending files: %s", ", ".join(file_names)) sys.exit(1) + logging.info("No files have Pending Identifications.") def check_policy(api_url: str, config: Dict[str, Any], links: Dict[str, str]) -> None: - """Check for policy violations and exit if any are found.""" + """Checks for policy violations and exit if any are found. + + Parameters: + api_url(str): The url to access the api. + config(Dict[str, Any]): contains the username, token and scan coded needed to check the intended scan. + links(Dict[str, str]): Contains the links to use to get scan results + """ + if config["policy_check"]: + logging.info("Checking if any files introduce policy violations...") policy_violations = check_policy_violations( api_url, config["username"], config["token"], config["scan_code"] ) policy_warnings = policy_violations.get("policy_warnings_list", []) + if policy_warnings: + logging.info("Policy violations found!") for warning in policy_warnings: + if warning.get("license_id"): logging.info( "License Violation: %s - %s files", @@ -168,17 +221,24 @@ def check_policy(api_url: str, config: Dict[str, Any], links: Dict[str, str]) -> sys.exit(1) logging.info("No policy violations found.") - -def get_scan_information( - api_url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Get scan information from the API.""" +def get_scan_information(api_url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: + """Retrieves all the scan information from the API. + + Parameters: + api_url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code to identify the scan to check. + + Returns: + The api response from the fossid workbench. + """ payload = create_payload(username, token, scan_code, "get_information") return hf.make_api_call(api_url, payload) - def main(): """Main function to orchestrate scan checks.""" + parser = argparse.ArgumentParser( description=( "Check scan status and pending identifications, " diff --git a/post-scan-reports/post_scan_reports.py b/post-scan-reports/post_scan_reports.py index fcecc2b..06c6cda 100644 --- a/post-scan-reports/post_scan_reports.py +++ b/post-scan-reports/post_scan_reports.py @@ -46,26 +46,18 @@ def __init__(self, url: str, username: str, token: str): self.username = username self.token = token - #Helper function to make API calls. - """def make_api_call(self, payload: Dict[str, Any]) -> Dict[str, Any]: - try: - logging.debug( - "Making API call with payload: %s", json.dumps(payload, indent=2) - ) - response = session.post(self.url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise -""" - def check_scan_status(self, scan_code: str, process_id: str = None) -> Dict[str, Any]: - """Check Workbench scan status.""" + """Check Workbench scan status. + + Parameters: + self(ApiClient): The instance of the API client to check the scan status of. + scan_code(str): The code that identifies the scan to check. + process_id(str): The process id if one is needed. + + Returns: + A dicitonary of the data returned from the check scan api call. + """ + payload = { "group": "scans", "action": "check_status", @@ -76,12 +68,23 @@ def check_scan_status(self, scan_code: str, process_id: str = None) -> Dict[str, "delay_response": "1", }, } + if process_id: payload["data"]["process_id"] = process_id return self.make_api_call(payload) def generate_report(self, scan_code: str, report_type: str) -> Tuple[str, str]: - """Generate Workbench report.""" + """Generate Workbench report. + + Parameters: + self(ApiClient): The instance of the API client to cgenerate a report on. + scan_code(str): The code that identifies the scan to use. + report_type(str): The type of report to generate. + + Returns: + A tuple of the data returned from the generate report api call. + """ + payload = { "group": "scans", "action": "generate_report", @@ -95,13 +98,22 @@ def generate_report(self, scan_code: str, report_type: str) -> Tuple[str, str]: "async": "1", }, } + response_data = self.make_api_call(payload) return response_data.get("process_queue_id"), response_data.get( "generation_process", {} ).get("id") def create_output_dir(self, output_dir: str) -> str: - """Create the output directory if it doesn't exist.""" + """Create the output directory if it doesn't exist. + + Parameters: + self(ApiClient): The instance of the API client to cretea the output dictionary of. + output_dir(str): The output directory for the scan and its reports. + + Returns: + The updated output directory. + """ if output_dir and not os.path.isdir(output_dir): try: os.makedirs(output_dir, exist_ok=True) @@ -118,10 +130,17 @@ def create_output_dir(self, output_dir: str) -> str: output_dir = "" return output_dir - def download_report( - self, scan_code: str, process_queue_id: str, report_type: str, output_dir: str - ) -> None: - """Download report. If output_dir is set it will save it to that path.""" + def download_report(self, scan_code: str, process_queue_id: str, report_type: str, output_dir: str) -> None: + """Downloads report. If output_dir is set it will save it to that path. + + Parameters: + self(ApiClient): The instance of the API client to download a report of. + scan_code(str): The code that identifies the scan. + process_queue_id(str): The process id if one is needed. + report_type(str): The type of report to download. + output_dir(str): The output directory for the scan and its reports. + """ + payload = { "group": "download", "action": "download_report", @@ -132,6 +151,7 @@ def download_report( "process_id": process_queue_id, }, } + response = requests.post(self.url, json=payload, timeout=120) response.raise_for_status() @@ -147,6 +167,7 @@ def download_report( file_name = f"{scan_code}_{report_type}_report.{file_extension}" output_dir = self.create_output_dir(output_dir) + if output_dir: try: file_name = os.path.join(output_dir, file_name) @@ -154,6 +175,7 @@ def download_report( logging.error( "Error joining output dir with filename: %s | %s", output_dir, str(ex) ) + contents = self.process_report_contents(response, report_type) mode = "w" if isinstance(contents, str) else "wb" with open(file_name, mode, encoding="utf-8" if mode == "w" else None) as f: @@ -161,7 +183,14 @@ def download_report( logging.info("Report downloaded and saved as %s", file_name) def process_report_contents(self, response, report_type: str): - """Process report contents based on report type.""" + """Process report contents based on report type. + + Parameters: + self(ApiClient): The instance of the API client to process reports of. + response(JSON): The response from the api call made. + report_type(str): The type of report to download. + """ + contents = response.content if report_type == "dynamic_top_matched_components": try: @@ -177,19 +206,23 @@ def process_report_contents(self, response, report_type: str): ) return contents -def process_report_type( - client: ApiClient, - scan_code: str, - report_type: str, - check_interval: int, - output_dir: str, -) -> None: - """Process report type for the scan.""" +def process_report_type(client: ApiClient,scan_code: str,report_type: str,check_interval: int,output_dir: str,) -> None: + """Process report type for the scan. + + Parameters: + client(ApiClient): The instance of the API client to download a report of. + scan_code(str): The code that identifies the scan. + report_type(str): The type of report to download. + check_interval(int) The amount of time to sleep before checking the scan status again.: + output_dir(str): The output directory for the scan and its reports. + """ + logging.info("Generating %s report...", report_type) process_queue_id, generation_process_id = client.generate_report(scan_code, report_type) logging.info("Report generation started. Process ID: %s", process_queue_id) logging.info("Checking %s Report Generation Status...", report_type) report_status = client.check_scan_status(scan_code, process_id=generation_process_id) + while report_status["status"] != "FINISHED": logging.info( "Report generation status: %s, waiting to complete...", @@ -197,14 +230,21 @@ def process_report_type( ) time.sleep(check_interval) report_status = client.check_scan_status(scan_code, process_id=generation_process_id) + logging.info("%s Report generation completed.", report_type) logging.info("Downloading %s report...", report_type) client.download_report(scan_code, process_queue_id, report_type, output_dir) def main(config_data: Dict[str, Any]) -> None: - """Main function to check scan status, generate and download report.""" + """Main function to check scan status, generate and download report. + + Parameters: + config_data(Dict[str, Any]): The data needed to check scan status and make reports. + """ + client = ApiClient(config_data["url"], config_data["username"], config_data["token"]) try: + logging.info("Checking Scan: %s Status...", config_data["scan_code"]) scan_status = client.check_scan_status(config_data["scan_code"]) while scan_status["status"] != "FINISHED": @@ -226,6 +266,7 @@ def main(config_data: Dict[str, Any]) -> None: client, config_data["scan_code"], config_data["report_type"], config_data["check_interval"], config_data["output_dir"] ) + except requests.exceptions.RequestException as e: logging.error("A requests exception occurred: %s", str(e)) except json.JSONDecodeError as e: @@ -234,6 +275,8 @@ def main(config_data: Dict[str, Any]) -> None: logging.error("An OS error occurred: %s", str(e)) if __name__ == "__main__": + """Sets up the arugments.""" + parser = argparse.ArgumentParser( description="Check scan status, generate and download report.", epilog="Example: python script.py --scan-code SCAN123 --report-types xlsx spdx", diff --git a/quick-scan/quick_scan.py b/quick-scan/quick_scan.py index 4c91150..77a9b85 100644 --- a/quick-scan/quick_scan.py +++ b/quick-scan/quick_scan.py @@ -19,26 +19,18 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -#Helper function to make API calls. -"""def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: - try: - logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) - response = requests.post(url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise -""" - -def quick_scan( - api_url: str, api_user: str, api_key: str, file_content: str -) -> Dict[str, Any]: - """Perform the quick scan""" +def quick_scan(api_url: str, api_user: str, api_key: str, file_content: str) -> Dict[str, Any]: + """Performs the quick scan. + + Parameters: + api_url(str): the url to access the api. + api_user(str): the username to access the fossid api. + api_key(str): the required key to access the fossid api. + file_path(str): the path to the required file content to peform the scan on. + + Returns: + A dictionary of the api response to the quick scan. + """ payload = { "group": "quick_scan", "action": "scan_one_file", @@ -52,21 +44,33 @@ def quick_scan( } return hf.make_api_call(api_url, payload) - def format_scan_result(result_data: Dict[str, Any], quick_view_link: str) -> str: - """Format the scan result for display""" + """Format the scan result for display. + + Parameters: + result_data(Dict[str, Any]): The data returned from the quick scan api call. + quick_view_link(str): A link that gives a view of the scanned file. + + Returns: + A string with scan result data and links to access it if the scan worked as intended. + """ + component = result_data.get("component") match_type = result_data.get("type") + if component: + artifact = component.get("artifact") author = component.get("author") if match_type == "file": + return ( f"This entire file seems to originate from the {artifact} " f"repository by {author}. Drop this file into the Quick View in Workbench for " f"more information. You can access it here: {quick_view_link}" ) if match_type == "partial": + remote_size = result_data["snippet"].get("remote_size") return ( f"This file has {remote_size} lines that look like they're from " @@ -76,11 +80,16 @@ def format_scan_result(result_data: Dict[str, Any], quick_view_link: str) -> str return "Unknown match type." return "No matches found." +def main(api_url: str, api_user: str, api_key: str, file_path: str, raw_output: bool): + """Main function to perform the quick scan and print the results. + + Parameters: + api_url(str): the url to access the api. + api_user(str): the username to access the fossid api. + api_key(str): the required key to access the fossid api. + file_path(str): the path to the required file content to peform the scan on. + """ -def main( - api_url: str, api_user: str, api_key: str, file_path: str, raw_output: bool -): - """Main function to perform the quick scan and print the results.""" # Ensure the API URL ends with /api.php and doesn't contain it twice if not api_url.endswith("/api.php"): api_url = api_url.rstrip("/") + "/api.php" @@ -116,6 +125,7 @@ def main( if __name__ == "__main__": + """Sets up the arugments.""" parser = argparse.ArgumentParser( description="Perform a quick scan of a single file." ) From d281705c8105835370bc17005a6cb9e64cb221aa Mon Sep 17 00:00:00 2001 From: Eli Dross Date: Tue, 17 Dec 2024 14:49:24 -0500 Subject: [PATCH 5/5] optimized post_scan_gates --- post-scan-gates/post_scan_gates.py | 54 ++++++-------------------- post-scan-reports/post_scan_reports.py | 13 ++----- quick-scan/quick_scan.py | 2 + 3 files changed, 16 insertions(+), 53 deletions(-) diff --git a/post-scan-gates/post_scan_gates.py b/post-scan-gates/post_scan_gates.py index 5b26e98..5e68a13 100644 --- a/post-scan-gates/post_scan_gates.py +++ b/post-scan-gates/post_scan_gates.py @@ -32,8 +32,8 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -def check_scan_status(api_url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: - """Checks the status of the scan. +def check(api_url: str, username: str, token: str, scan_code: str, action: str) -> Dict[str, Any]: + """Checks the given action of the given scan code. Parameters: api_url(str): The url to access the api. @@ -44,41 +44,9 @@ def check_scan_status(api_url: str, username: str, token: str, scan_code: str) - Returns: The response from the scan api call. """ - payload = create_payload(username, token, scan_code, API_ACTION_CHECK_STATUS) + payload = create_payload(username, token, scan_code, action) return hf.make_api_call(api_url, payload) - -def check_pending_identifications(api_url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: - """Checks for pending identifications in the scan. - - Parameters: - api_url(str): The url to access the api. - username(str): The username to use for the api. - token(str): The required token to access the api. - scan_code(str): The code to identify the scan to check. - - Returns: - The response from the api call. - """ - payload = create_payload(username, token, scan_code, API_ACTION_GET_PENDING_FILES) - return hf.make_api_call(api_url, payload) - - -def check_policy_violations(api_url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: - """Checks for policy violations in the scan. - - Parameters: - api_url(str): The url to access the api. - username(str): The username to use for the api. - token(str): The required token to access the api. - scan_code(str): The code to identify the scan to check. - - Returns: - The response from the api call.""" - payload = create_payload(username, token, scan_code, API_ACTION_GET_POLICY_WARNINGS) - return hf.make_api_call(api_url, payload) - - def create_payload(username: str, token: str, scan_code: str, action: str) -> Dict[str, Any]: """Create payload for API calls. @@ -143,7 +111,7 @@ def wait_for_scan_completion(api_url: str, config: Dict[str, Any]) -> None: """ logging.info("Checking Scan Status...") - scan_status = check_scan_status(api_url, config["username"], config["token"], config["scan_code"]) + scan_status = check(api_url, config["username"], config["token"], config["scan_code"], API_ACTION_CHECK_STATUS) while scan_status.get("status") != "FINISHED": logging.info( @@ -151,8 +119,8 @@ def wait_for_scan_completion(api_url: str, config: Dict[str, Any]) -> None: scan_status.get("status", "UNKNOWN"), ) time.sleep(config["interval"]) - scan_status = check_scan_status( - api_url, config["username"], config["token"], config["scan_code"] + scan_status = check( + api_url, config["username"], config["token"], config["scan_code"], API_ACTION_CHECK_STATUS ) logging.info("The Scan completed!") @@ -167,8 +135,8 @@ def check_pending_files(api_url: str, config: Dict[str, Any], links: Dict[str, s """ logging.info("Checking if any files have Pending Identifications...") - pending_files = check_pending_identifications( - api_url, config["username"], config["token"], config["scan_code"] + pending_files = check( + api_url, config["username"], config["token"], config["scan_code"], API_ACTION_GET_PENDING_FILES ) if pending_files: @@ -195,8 +163,8 @@ def check_policy(api_url: str, config: Dict[str, Any], links: Dict[str, str]) -> if config["policy_check"]: logging.info("Checking if any files introduce policy violations...") - policy_violations = check_policy_violations( - api_url, config["username"], config["token"], config["scan_code"] + policy_violations = check( + api_url, config["username"], config["token"], config["scan_code"], API_ACTION_GET_POLICY_WARNINGS ) policy_warnings = policy_violations.get("policy_warnings_list", []) @@ -238,7 +206,7 @@ def get_scan_information(api_url: str, username: str, token: str, scan_code: str def main(): """Main function to orchestrate scan checks.""" - + parser = argparse.ArgumentParser( description=( "Check scan status and pending identifications, " diff --git a/post-scan-reports/post_scan_reports.py b/post-scan-reports/post_scan_reports.py index 06c6cda..8e0aa60 100644 --- a/post-scan-reports/post_scan_reports.py +++ b/post-scan-reports/post_scan_reports.py @@ -18,12 +18,7 @@ import requests # Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - -# Create a session object for making requests -#session = requests.Session() +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # List of all report types REPORT_TYPES = [ @@ -37,9 +32,7 @@ ] class ApiClient: - """ - This class represents an API client for making requests to the FossID Workbench API. - """ + """This class represents an API client for making requests to the FossID Workbench API.""" def __init__(self, url: str, username: str, token: str): self.url = url @@ -276,7 +269,7 @@ def main(config_data: Dict[str, Any]) -> None: if __name__ == "__main__": """Sets up the arugments.""" - + parser = argparse.ArgumentParser( description="Check scan status, generate and download report.", epilog="Example: python script.py --scan-code SCAN123 --report-types xlsx spdx", diff --git a/quick-scan/quick_scan.py b/quick-scan/quick_scan.py index 77a9b85..606e574 100644 --- a/quick-scan/quick_scan.py +++ b/quick-scan/quick_scan.py @@ -117,9 +117,11 @@ def main(api_url: str, api_user: str, api_key: str, file_path: str, raw_output: else: logging.info("No matches found.") except requests.exceptions.RequestException as e: + logging.error("API call failed: %s", str(e)) sys.exit(1) except json.JSONDecodeError as e: + logging.error("Failed to parse JSON response: %s", str(e)) sys.exit(1)