diff --git a/README.md b/README.md index bf1c610..9748abc 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,10 @@ These scripts were built by the FossID Customer Experience teams in collaboratio These examples demonstrate how to interact with the Workbench API. We do our best to keep the examples updated, but there is no long-term maintainer for this code. We do not use GitHub Issues - for questions or issues with the scripts please use the [FossID Support Portal](https://support.fossid.com/). Thank you! ## Example Scripts +Each script interacts with the FOSSID workbench API. To use our API a payload dictionary is required for each API call. +Each different API call has a different payload with different requirments. To see these requirements, you can look in the +scripts to see each required key for the payload or view it on API website. + The repo has scripts that help you: ### Archive Old Scans @@ -19,6 +23,9 @@ This script downloads reports for a scan - useful if you want to include FossID ### Quickly Scan a File This script scans a single file using the Quick Scan API - helpful for quickly knowing if AI-generated code should be investigated further. +### Helper Functions +This script contains helper functions that are used in the example scripts + ## Contributing Contributions are welcome! We'll review any Pull Requests made. diff --git a/anon-deactivated-users/anon_deactivated_users.py b/anon-deactivated-users/anon_deactivated_users.py index 1f7b80a..5cc3eb8 100644 --- a/anon-deactivated-users/anon_deactivated_users.py +++ b/anon-deactivated-users/anon_deactivated_users.py @@ -3,45 +3,56 @@ import logging import argparse import os +import helper_functions as hf # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -# Create a session object for making requests -session = requests.Session() +def get_all_users(url, username, token): + """ Gets all users in the system -# Helper function to make API calls -def make_api_call(url: str, payload: dict) -> dict: - try: - response = session.post(url, json=payload) - response.raise_for_status() - return response.json() - except requests.exceptions.RequestException as e: - logging.error(f"API call failed: {e}") - raise - -# Function to get all users -def get_all_users(api_url, api_username, api_token): + Parameters: + url(str): The link to acess the api. + username(str): The username to use to access the api. + token(str): The token required to access the api. + + Returns: + A dictionary with every user. + """ payload = { "group": "users", "action": "get_all_users", "data": { - "username": api_username, - "key": api_token, + "username": username, + "key": token, "include_deactivated": "1" } } - logging.debug(make_api_call(api_url, payload)) - return make_api_call(api_url, payload) - -# Function to update user information -def update_user(api_url, api_username, api_token, user_username, user_name, user_surname, user_email, user_password): + logging.debug(hf.make_api_call(url, payload)) + return hf.make_api_call(url, payload) + +def update_user(url, username, token, user_username, user_name, user_surname, user_email, user_password): + """Updates the user information based on the given data from the parameters. + + Parameters: + url(str): The link to access to the api. + username(str): The username to use to access the api. + token(str): The token required to access the api. + user_username(str): The username of the user to update. + user_name(str): The updated name for the username. + user_surname(str): The updated surname for the username. + user_email(str): The updated email for the username. + user_password(str): The updated password for the username. + + Returns: + A dictionary with updated data on the user. + """ payload = { "group": "users", "action": "update", "data": { - "username": api_username, - "key": api_token, + "username": username, + "key": token, "user_username": user_username, "user_name": user_name, "user_surname": user_surname, @@ -49,9 +60,17 @@ def update_user(api_url, api_username, api_token, user_username, user_name, user "user_password": user_password } } - return make_api_call(api_url, payload) + return hf.make_api_call(url, payload) def main(api_base_url, api_username, api_token, dry_run): + """The main functionality of the script. Anonimizes users who have been deactivated + + Parameters: + api_base_url(str): The base url for the FOSSID workbench. + api_username(str): The username to use to access the api. + api_token(str): The required token to access the api. + dry_run(boolean): True if a dry run was requested. + """ # Ensure the API URL ends with /api.php if not api_base_url.endswith('/api.php'): api_url = api_base_url.rstrip('/') + '/api.php' @@ -80,6 +99,7 @@ def main(api_base_url, api_username, api_token, dry_run): "updated_username": updated_username }) + #Performes the dry run and then returns before anonimizing any users. if dry_run: if users_to_update: logging.info("Dry Run enabled! The following users would be updated:") @@ -117,6 +137,7 @@ def main(api_base_url, api_username, api_token, dry_run): except Exception as e: logging.error(f"An error occurred: {e}") +#sets up the arugments to run the script if __name__ == "__main__": parser = argparse.ArgumentParser(description='Anonymize deactivated users in Workbench.') parser.add_argument('--workbench-url', type=str, help='The Workbench API URL') @@ -134,4 +155,5 @@ def main(api_base_url, api_username, api_token, dry_run): logging.error("The Workbench URL, username, and token must be provided either as arguments or environment variables.") exit(1) + #calls the main function to run the script and anonimize deactivited users main(api_base_url, api_username, api_token, args.dry_run) \ No newline at end of file diff --git a/archive-stale-scans/archive_stale_scans.py b/archive-stale-scans/archive_stale_scans.py index d5c08f4..deb49bd 100644 --- a/archive-stale-scans/archive_stale_scans.py +++ b/archive-stale-scans/archive_stale_scans.py @@ -13,7 +13,7 @@ import argparse import os from typing import List, Tuple, Dict, Any - +import helper_functions as hf import requests from tabulate import tabulate @@ -22,69 +22,83 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -# Create a session object for making requests -session = requests.Session() - - -def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: - """Helper function to make API calls.""" - try: - logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) - response = session.post(url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise - - def list_scans(url: str, username: str, token: str) -> Dict[str, Any]: - """List all scans.""" + """List all scans. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + Returns: + A dicitionary with each scan. + """ payload = { "group": "scans", "action": "list_scans", "data": {"username": username, "key": token}, } - return make_api_call(url, payload) - - -def get_scan_info( - url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Get scan info for each scan.""" + return hf.make_api_call(url, payload) + + +def get_scan_info(url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: + """Get scan info for each scan. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code for the scan to get the info on. + + Returns: + A dicitonary with the info on the given scan code. + """ payload = { "group": "scans", "action": "get_information", "data": {"username": username, "key": token, "scan_code": scan_code}, } - return make_api_call(url, payload) - - -def get_project_info( - url: str, username: str, token: str, project_code: str -) -> Dict[str, Any]: - """Get the project name for each scan's project code.""" + return hf.make_api_call(url, payload) + + +def get_project_info(url: str, username: str, token: str, project_code: str) -> Dict[str, Any]: + """Get the project name for each scan's project code. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + project_code(str): The code for the project to get the info on. + + Returns: + A dicitionary with the data on the given project. + """ payload = { "group": "projects", "action": "get_information", "data": {"username": username, "key": token, "project_code": project_code}, } - return make_api_call(url, payload) + return hf.make_api_call(url, payload) def archive_scan(url: str, username: str, token: str, scan_code: str) -> bool: - """Archive a scan.""" + """Archives a scan. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code for the scan to get the info on. + + Returns: + Boolean; True if the scan was succesfully archived, false otherwise. + """ payload = { "group": "scans", "action": "archive_scan", "data": {"username": username, "key": token, "scan_code": scan_code}, } try: - response = session.post(url, json=payload) + response = requests.post(url, json=payload) response.raise_for_status() return response.status_code == 200 except requests.exceptions.RequestException as e: @@ -92,22 +106,39 @@ def archive_scan(url: str, username: str, token: str, scan_code: str) -> bool: return False -def find_old_scans( - scans: Dict[str, Any], url: str, username: str, token: str, days: int -) -> List[Tuple[str, str, str, datetime, datetime]]: - """Find scans that were last updated before the specified days.""" +def find_old_scans(scans: Dict[str, Any], url: str, username: str, token: str, days: int) -> List[Tuple[str, str, str, datetime, datetime]]: + """Find scans that were last updated before the specified days. + + Parameters: + scans(dict[str, any]): A dicitonary containing the scans + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + days(int): The amount of days passed for a scan to be old. + + Returns: + A list of tuples that contains that have not been updated since the given + amount of days. + """ + old_scans = [] time_limit = datetime.now() - timedelta(days=days) + for scan_info in scans.values(): + scan_code = scan_info["code"] scan_details = get_scan_info(url, username, token, scan_code) + if scan_details["is_archived"]: continue + creation_date = datetime.strptime(scan_details["created"], "%Y-%m-%d %H:%M:%S") update_date = datetime.strptime(scan_details["updated"], "%Y-%m-%d %H:%M:%S") + if update_date < time_limit: project_code = scan_details.get("project_code") project_name = "No Project" + if project_code: project_info = get_project_info(url, username, token, project_code) project_name = project_info.get("project_name", "Unknown Project") @@ -120,27 +151,41 @@ def find_old_scans( update_date, ) ) + return old_scans def display_scans(scans: List[Tuple[str, str, str, datetime, datetime]], dry_run: bool): - """Display scans that would be archived.""" + """Display scans that would be archived. + + Parameters: + scans(List[Tuple[str, str, str, datetime, datetime]]): The scans to check through. + dry_run(boolean): True if the run should be a dry run, false otherwise. + """ if dry_run: logging.info("Dry Run enabled! These scans would be archived:") else: logging.info("These scans will be archived:") - headers = ["PROJECT NAME", "SCAN NAME", "SCAN AGE (days)", "LAST MODIFIED"] - table = [ - [project_name, scan_name, (datetime.now() - update_date).days, update_date] - for project_name, scan_name, _, _, update_date in scans - ] - print(tabulate(table, headers, tablefmt="fancy_grid")) - - -def fetch_and_find_old_scans( - url: str, username: str, token: str, days: int -) -> List[Tuple[str, str, str, datetime, datetime]]: - """Fetch scans and find the ones that are older than the specified number of days.""" + headers = ["PROJECT NAME", "SCAN NAME", "SCAN AGE (days)", "LAST MODIFIED"] + table = [ + [project_name, scan_name, (datetime.now() - update_date).days, update_date] + for project_name, scan_name, _, _, update_date in scans + ] + print(tabulate(table, headers, tablefmt="fancy_grid")) + + +def fetch_and_find_old_scans(url: str, username: str, token: str, days: int) -> List[Tuple[str, str, str, datetime, datetime]]: + """Fetch scans and find the ones that are older than the specified number of days. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + days(int): The amount of days passed for a scan to be old + + Returns: + A list of tuples of the old scans. + """ logging.info("Fetching scans from Workbench...") try: scans = list_scans(url, username, token) @@ -152,14 +197,17 @@ def fetch_and_find_old_scans( return find_old_scans(scans, url, username, token, days) -def archive_scans( - url: str, - username: str, - token: str, - scans: List[Tuple[str, str, str, datetime, datetime]], -): - """Archive the specified scans.""" +def archive_scans(url: str, username: str, token: str, scans: List[Tuple[str, str, str, datetime, datetime]]): + """Archive the specified scans. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scans(List[Tuple[str, str, str, datetime, datetime]]): The scans to archive. + """ for project_name, scan_name, scan_code, _, _ in scans: + logging.info("Archiving scan: %s (%s)", scan_name, project_name) if archive_scan(url, username, token, scan_code): logging.info("Archived scan: %s", scan_name) @@ -168,7 +216,16 @@ def archive_scans( def main(url: str, username: str, token: str, days: int, dry_run: bool): - """Main function to archive old scans.""" + """Main function to archive old scans. + + Parameters: + url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + days(int): The required amount of days passed for a scan to be considered old. + dry_run(boolean): True if dry run was enabled. + """ + old_scans = fetch_and_find_old_scans(url, username, token, days) if not old_scans: logging.info("No scans were last updated more than %d days ago. Exiting.", days) @@ -186,7 +243,7 @@ def main(url: str, username: str, token: str, days: int, dry_run: bool): archive_scans(url, username, token, old_scans) - +#sets up the arguments if __name__ == "__main__": parser = argparse.ArgumentParser(description="Archive old scans.") parser.add_argument("--workbench-url", type=str, help="The Workbench API URL") @@ -221,4 +278,5 @@ def main(url: str, username: str, token: str, days: int, dry_run: bool): if not api_url.endswith("/api.php"): api_url += "/api.php" + #calls the main function to acrive the stale scans main(api_url, api_username, api_token, args.days, args.dry_run) diff --git a/helper_functions.py b/helper_functions.py new file mode 100644 index 0000000..09981c0 --- /dev/null +++ b/helper_functions.py @@ -0,0 +1,32 @@ +import json +import time +import logging +import argparse +import os +import re +from typing import Dict, Any +import sys +import requests + +def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: + """Helper function to make API calls. + + Parameters: + url(str): The url to access the API. + payload(Dict[str, Any]): A dictionary that contains the data required by the API. + + Returns: + Returns a dicitionary with the API response data + """ + try: + logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) + response = requests.post(url, json=payload, timeout=10) + response.raise_for_status() + logging.debug("Received response: %s", response.text) + return response.json().get("data", {}) + except requests.exceptions.RequestException as e: + logging.error("API call failed: %s", str(e)) + raise + except json.JSONDecodeError as e: + logging.error("Failed to parse JSON response: %s", str(e)) + raise \ No newline at end of file diff --git a/post-scan-gates/post_scan_gates.py b/post-scan-gates/post_scan_gates.py index 51cc04f..5e68a13 100644 --- a/post-scan-gates/post_scan_gates.py +++ b/post-scan-gates/post_scan_gates.py @@ -20,6 +20,7 @@ from typing import Dict, Any import sys import requests +import helper_functions as hf # Constants API_ACTION_CHECK_STATUS = "check_status" @@ -31,54 +32,33 @@ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -# Create a session object for making requests -session = requests.Session() - - -def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: - """Helper function to make API calls.""" - try: - logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) - response = session.post(url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise - - -def check_scan_status( - api_url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Check the status of the scan.""" - payload = create_payload(username, token, scan_code, API_ACTION_CHECK_STATUS) - return make_api_call(api_url, payload) - - -def check_pending_identifications( - api_url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Check for pending identifications in the scan.""" - payload = create_payload(username, token, scan_code, API_ACTION_GET_PENDING_FILES) - return make_api_call(api_url, payload) - - -def check_policy_violations( - api_url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Check for policy violations in the scan.""" - payload = create_payload(username, token, scan_code, API_ACTION_GET_POLICY_WARNINGS) - return make_api_call(api_url, payload) - - -def create_payload( - username: str, token: str, scan_code: str, action: str -) -> Dict[str, Any]: - """Create payload for API calls.""" +def check(api_url: str, username: str, token: str, scan_code: str, action: str) -> Dict[str, Any]: + """Checks the given action of the given scan code. + + Parameters: + api_url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code to identify the scan to check. + + Returns: + The response from the scan api call. + """ + payload = create_payload(username, token, scan_code, action) + return hf.make_api_call(api_url, payload) + +def create_payload(username: str, token: str, scan_code: str, action: str) -> Dict[str, Any]: + """Create payload for API calls. + + Parameters: + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code to identify the scan to check. + action(str): The given action to input into the payload for the api to do. + + Returns: + The payload dictionary to use. + """ return { "group": "scans", "action": action, @@ -87,14 +67,29 @@ def create_payload( def validate_and_get_api_url(url: str) -> str: - """Validate and construct the API URL.""" + """Validate and construct the API URL. + + Parameters: + url(str): The url to access the api. + + Returns: + The validated URL + """ if not url.endswith("/api.php"): return url.rstrip("/") + "/api.php" return url def generate_links(base_url: str, scan_id: int) -> Dict[str, str]: - """Generate links for scan results.""" + """Generate links for scan results. + + Parameters: + base_url(str): The url to access the api. + scan_id(int): the ID of the scan to create links for. + + Returns: + A dictionary containing all the links of scan results. + """ return { "scan_link": ( f"{base_url}/index.html?form=main_interface&action=scanview&sid={scan_id}" @@ -108,31 +103,42 @@ def generate_links(base_url: str, scan_id: int) -> Dict[str, str]: def wait_for_scan_completion(api_url: str, config: Dict[str, Any]) -> None: - """Wait for the scan to complete.""" + """Wait for the scan to complete. + + Parameters: + api_url(str): The url to access the api. + config(Dict[str, Any]): contains the username, token and scan coded needed to check the intended scan. + """ + logging.info("Checking Scan Status...") - scan_status = check_scan_status( - api_url, config["username"], config["token"], config["scan_code"] - ) + scan_status = check(api_url, config["username"], config["token"], config["scan_code"], API_ACTION_CHECK_STATUS) + while scan_status.get("status") != "FINISHED": logging.info( "Scan status: %s, waiting to complete...", scan_status.get("status", "UNKNOWN"), ) time.sleep(config["interval"]) - scan_status = check_scan_status( - api_url, config["username"], config["token"], config["scan_code"] + scan_status = check( + api_url, config["username"], config["token"], config["scan_code"], API_ACTION_CHECK_STATUS ) logging.info("The Scan completed!") -def check_pending_files( - api_url: str, config: Dict[str, Any], links: Dict[str, str] -) -> None: - """Check for pending files and exit if any are found.""" +def check_pending_files(api_url: str, config: Dict[str, Any], links: Dict[str, str]) -> None: + """Check for pending files and exit if any are found. + + Parameters: + api_url(str): The url to access the api. + config(Dict[str, Any]): contains the username, token and scan coded needed to check the intended scan. + links(Dict[str, str]): Contains the links to use to get scan results + """ + logging.info("Checking if any files have Pending Identifications...") - pending_files = check_pending_identifications( - api_url, config["username"], config["token"], config["scan_code"] + pending_files = check( + api_url, config["username"], config["token"], config["scan_code"], API_ACTION_GET_PENDING_FILES ) + if pending_files: file_names = list(pending_files.values()) if file_names: @@ -141,20 +147,32 @@ def check_pending_files( if config["show_files"]: logging.info("Pending files: %s", ", ".join(file_names)) sys.exit(1) + logging.info("No files have Pending Identifications.") def check_policy(api_url: str, config: Dict[str, Any], links: Dict[str, str]) -> None: - """Check for policy violations and exit if any are found.""" + """Checks for policy violations and exit if any are found. + + Parameters: + api_url(str): The url to access the api. + config(Dict[str, Any]): contains the username, token and scan coded needed to check the intended scan. + links(Dict[str, str]): Contains the links to use to get scan results + """ + if config["policy_check"]: + logging.info("Checking if any files introduce policy violations...") - policy_violations = check_policy_violations( - api_url, config["username"], config["token"], config["scan_code"] + policy_violations = check( + api_url, config["username"], config["token"], config["scan_code"], API_ACTION_GET_POLICY_WARNINGS ) policy_warnings = policy_violations.get("policy_warnings_list", []) + if policy_warnings: + logging.info("Policy violations found!") for warning in policy_warnings: + if warning.get("license_id"): logging.info( "License Violation: %s - %s files", @@ -171,17 +189,24 @@ def check_policy(api_url: str, config: Dict[str, Any], links: Dict[str, str]) -> sys.exit(1) logging.info("No policy violations found.") - -def get_scan_information( - api_url: str, username: str, token: str, scan_code: str -) -> Dict[str, Any]: - """Get scan information from the API.""" +def get_scan_information(api_url: str, username: str, token: str, scan_code: str) -> Dict[str, Any]: + """Retrieves all the scan information from the API. + + Parameters: + api_url(str): The url to access the api. + username(str): The username to use for the api. + token(str): The required token to access the api. + scan_code(str): The code to identify the scan to check. + + Returns: + The api response from the fossid workbench. + """ payload = create_payload(username, token, scan_code, "get_information") - return make_api_call(api_url, payload) - + return hf.make_api_call(api_url, payload) def main(): """Main function to orchestrate scan checks.""" + parser = argparse.ArgumentParser( description=( "Check scan status and pending identifications, " diff --git a/post-scan-reports/post_scan_reports.py b/post-scan-reports/post_scan_reports.py index 6d6f572..8e0aa60 100644 --- a/post-scan-reports/post_scan_reports.py +++ b/post-scan-reports/post_scan_reports.py @@ -14,16 +14,11 @@ import argparse import os from typing import Dict, Any, Tuple - +import helper_functions as hf import requests # Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) - -# Create a session object for making requests -session = requests.Session() +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # List of all report types REPORT_TYPES = [ @@ -37,34 +32,25 @@ ] class ApiClient: - """ - This class represents an API client for making requests to the FossID Workbench API. - """ + """This class represents an API client for making requests to the FossID Workbench API.""" def __init__(self, url: str, username: str, token: str): self.url = url self.username = username self.token = token - def make_api_call(self, payload: Dict[str, Any]) -> Dict[str, Any]: - """Helper function to make API calls.""" - try: - logging.debug( - "Making API call with payload: %s", json.dumps(payload, indent=2) - ) - response = session.post(self.url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise - def check_scan_status(self, scan_code: str, process_id: str = None) -> Dict[str, Any]: - """Check Workbench scan status.""" + """Check Workbench scan status. + + Parameters: + self(ApiClient): The instance of the API client to check the scan status of. + scan_code(str): The code that identifies the scan to check. + process_id(str): The process id if one is needed. + + Returns: + A dicitonary of the data returned from the check scan api call. + """ + payload = { "group": "scans", "action": "check_status", @@ -75,12 +61,23 @@ def check_scan_status(self, scan_code: str, process_id: str = None) -> Dict[str, "delay_response": "1", }, } + if process_id: payload["data"]["process_id"] = process_id return self.make_api_call(payload) def generate_report(self, scan_code: str, report_type: str) -> Tuple[str, str]: - """Generate Workbench report.""" + """Generate Workbench report. + + Parameters: + self(ApiClient): The instance of the API client to cgenerate a report on. + scan_code(str): The code that identifies the scan to use. + report_type(str): The type of report to generate. + + Returns: + A tuple of the data returned from the generate report api call. + """ + payload = { "group": "scans", "action": "generate_report", @@ -94,13 +91,22 @@ def generate_report(self, scan_code: str, report_type: str) -> Tuple[str, str]: "async": "1", }, } + response_data = self.make_api_call(payload) return response_data.get("process_queue_id"), response_data.get( "generation_process", {} ).get("id") def create_output_dir(self, output_dir: str) -> str: - """Create the output directory if it doesn't exist.""" + """Create the output directory if it doesn't exist. + + Parameters: + self(ApiClient): The instance of the API client to cretea the output dictionary of. + output_dir(str): The output directory for the scan and its reports. + + Returns: + The updated output directory. + """ if output_dir and not os.path.isdir(output_dir): try: os.makedirs(output_dir, exist_ok=True) @@ -117,10 +123,17 @@ def create_output_dir(self, output_dir: str) -> str: output_dir = "" return output_dir - def download_report( - self, scan_code: str, process_queue_id: str, report_type: str, output_dir: str - ) -> None: - """Download report. If output_dir is set it will save it to that path.""" + def download_report(self, scan_code: str, process_queue_id: str, report_type: str, output_dir: str) -> None: + """Downloads report. If output_dir is set it will save it to that path. + + Parameters: + self(ApiClient): The instance of the API client to download a report of. + scan_code(str): The code that identifies the scan. + process_queue_id(str): The process id if one is needed. + report_type(str): The type of report to download. + output_dir(str): The output directory for the scan and its reports. + """ + payload = { "group": "download", "action": "download_report", @@ -131,7 +144,8 @@ def download_report( "process_id": process_queue_id, }, } - response = session.post(self.url, json=payload, timeout=120) + + response = requests.post(self.url, json=payload, timeout=120) response.raise_for_status() file_extension = { @@ -146,6 +160,7 @@ def download_report( file_name = f"{scan_code}_{report_type}_report.{file_extension}" output_dir = self.create_output_dir(output_dir) + if output_dir: try: file_name = os.path.join(output_dir, file_name) @@ -153,6 +168,7 @@ def download_report( logging.error( "Error joining output dir with filename: %s | %s", output_dir, str(ex) ) + contents = self.process_report_contents(response, report_type) mode = "w" if isinstance(contents, str) else "wb" with open(file_name, mode, encoding="utf-8" if mode == "w" else None) as f: @@ -160,7 +176,14 @@ def download_report( logging.info("Report downloaded and saved as %s", file_name) def process_report_contents(self, response, report_type: str): - """Process report contents based on report type.""" + """Process report contents based on report type. + + Parameters: + self(ApiClient): The instance of the API client to process reports of. + response(JSON): The response from the api call made. + report_type(str): The type of report to download. + """ + contents = response.content if report_type == "dynamic_top_matched_components": try: @@ -176,19 +199,23 @@ def process_report_contents(self, response, report_type: str): ) return contents -def process_report_type( - client: ApiClient, - scan_code: str, - report_type: str, - check_interval: int, - output_dir: str, -) -> None: - """Process report type for the scan.""" +def process_report_type(client: ApiClient,scan_code: str,report_type: str,check_interval: int,output_dir: str,) -> None: + """Process report type for the scan. + + Parameters: + client(ApiClient): The instance of the API client to download a report of. + scan_code(str): The code that identifies the scan. + report_type(str): The type of report to download. + check_interval(int) The amount of time to sleep before checking the scan status again.: + output_dir(str): The output directory for the scan and its reports. + """ + logging.info("Generating %s report...", report_type) process_queue_id, generation_process_id = client.generate_report(scan_code, report_type) logging.info("Report generation started. Process ID: %s", process_queue_id) logging.info("Checking %s Report Generation Status...", report_type) report_status = client.check_scan_status(scan_code, process_id=generation_process_id) + while report_status["status"] != "FINISHED": logging.info( "Report generation status: %s, waiting to complete...", @@ -196,14 +223,21 @@ def process_report_type( ) time.sleep(check_interval) report_status = client.check_scan_status(scan_code, process_id=generation_process_id) + logging.info("%s Report generation completed.", report_type) logging.info("Downloading %s report...", report_type) client.download_report(scan_code, process_queue_id, report_type, output_dir) def main(config_data: Dict[str, Any]) -> None: - """Main function to check scan status, generate and download report.""" + """Main function to check scan status, generate and download report. + + Parameters: + config_data(Dict[str, Any]): The data needed to check scan status and make reports. + """ + client = ApiClient(config_data["url"], config_data["username"], config_data["token"]) try: + logging.info("Checking Scan: %s Status...", config_data["scan_code"]) scan_status = client.check_scan_status(config_data["scan_code"]) while scan_status["status"] != "FINISHED": @@ -225,6 +259,7 @@ def main(config_data: Dict[str, Any]) -> None: client, config_data["scan_code"], config_data["report_type"], config_data["check_interval"], config_data["output_dir"] ) + except requests.exceptions.RequestException as e: logging.error("A requests exception occurred: %s", str(e)) except json.JSONDecodeError as e: @@ -233,6 +268,8 @@ def main(config_data: Dict[str, Any]) -> None: logging.error("An OS error occurred: %s", str(e)) if __name__ == "__main__": + """Sets up the arugments.""" + parser = argparse.ArgumentParser( description="Check scan status, generate and download report.", epilog="Example: python script.py --scan-code SCAN123 --report-types xlsx spdx", diff --git a/quick-scan/quick_scan.py b/quick-scan/quick_scan.py index 9a8fccf..606e574 100644 --- a/quick-scan/quick_scan.py +++ b/quick-scan/quick_scan.py @@ -12,36 +12,25 @@ import os from typing import Dict, Any import requests +import helper_functions as hf # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -# Create a session object for making requests -session = requests.Session() - - -def make_api_call(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: - """Helper function to make API calls.""" - try: - logging.debug("Making API call with payload: %s", json.dumps(payload, indent=2)) - response = session.post(url, json=payload, timeout=10) - response.raise_for_status() - logging.debug("Received response: %s", response.text) - return response.json().get("data", {}) - except requests.exceptions.RequestException as e: - logging.error("API call failed: %s", str(e)) - raise - except json.JSONDecodeError as e: - logging.error("Failed to parse JSON response: %s", str(e)) - raise - - -def quick_scan( - api_url: str, api_user: str, api_key: str, file_content: str -) -> Dict[str, Any]: - """Perform the quick scan""" +def quick_scan(api_url: str, api_user: str, api_key: str, file_content: str) -> Dict[str, Any]: + """Performs the quick scan. + + Parameters: + api_url(str): the url to access the api. + api_user(str): the username to access the fossid api. + api_key(str): the required key to access the fossid api. + file_path(str): the path to the required file content to peform the scan on. + + Returns: + A dictionary of the api response to the quick scan. + """ payload = { "group": "quick_scan", "action": "scan_one_file", @@ -53,23 +42,35 @@ def quick_scan( "sensitivity": "10", }, } - return make_api_call(api_url, payload) - + return hf.make_api_call(api_url, payload) def format_scan_result(result_data: Dict[str, Any], quick_view_link: str) -> str: - """Format the scan result for display""" + """Format the scan result for display. + + Parameters: + result_data(Dict[str, Any]): The data returned from the quick scan api call. + quick_view_link(str): A link that gives a view of the scanned file. + + Returns: + A string with scan result data and links to access it if the scan worked as intended. + """ + component = result_data.get("component") match_type = result_data.get("type") + if component: + artifact = component.get("artifact") author = component.get("author") if match_type == "file": + return ( f"This entire file seems to originate from the {artifact} " f"repository by {author}. Drop this file into the Quick View in Workbench for " f"more information. You can access it here: {quick_view_link}" ) if match_type == "partial": + remote_size = result_data["snippet"].get("remote_size") return ( f"This file has {remote_size} lines that look like they're from " @@ -79,11 +80,16 @@ def format_scan_result(result_data: Dict[str, Any], quick_view_link: str) -> str return "Unknown match type." return "No matches found." +def main(api_url: str, api_user: str, api_key: str, file_path: str, raw_output: bool): + """Main function to perform the quick scan and print the results. + + Parameters: + api_url(str): the url to access the api. + api_user(str): the username to access the fossid api. + api_key(str): the required key to access the fossid api. + file_path(str): the path to the required file content to peform the scan on. + """ -def main( - api_url: str, api_user: str, api_key: str, file_path: str, raw_output: bool -): - """Main function to perform the quick scan and print the results.""" # Ensure the API URL ends with /api.php and doesn't contain it twice if not api_url.endswith("/api.php"): api_url = api_url.rstrip("/") + "/api.php" @@ -111,14 +117,17 @@ def main( else: logging.info("No matches found.") except requests.exceptions.RequestException as e: + logging.error("API call failed: %s", str(e)) sys.exit(1) except json.JSONDecodeError as e: + logging.error("Failed to parse JSON response: %s", str(e)) sys.exit(1) if __name__ == "__main__": + """Sets up the arugments.""" parser = argparse.ArgumentParser( description="Perform a quick scan of a single file." )