From 1e7ef3f0401a69e5fbd7a9ec980508cf9b155092 Mon Sep 17 00:00:00 2001 From: Daniel Monzonis Date: Wed, 8 Oct 2025 18:55:59 +0200 Subject: [PATCH 1/3] Adapt trustshell to affects v2 This commit adapts the trustshell code to use affects v2, by using the ps_update_stream instead of the ps_module. Note that this functionality relies on osidb_bindings being in a version where it supports affects v2. Closes OSIDB-4494. --- src/trustshell/osidb.py | 63 +++++++++++++++++++------------------- src/trustshell/products.py | 35 ++++++++++++++++----- tests/test_osidb.py | 24 +++++++-------- tests/test_products.py | 36 ++++++++++------------ 4 files changed, 87 insertions(+), 71 deletions(-) diff --git a/src/trustshell/osidb.py b/src/trustshell/osidb.py index da91ac8..c3a4055 100644 --- a/src/trustshell/osidb.py +++ b/src/trustshell/osidb.py @@ -25,27 +25,27 @@ def __init__(self) -> None: self.session = osidb_bindings.new_session(osidb_server_uri=endpoint) # type: ignore[attr-defined] @staticmethod - def parse_module_purl_tuples(tuples_list: list[str]) -> set[tuple[str, str]]: + def parse_stream_purl_tuples(tuples_list: list[str]) -> set[tuple[str, str]]: """ - Parses a list of "ps_module,purl" strings into a set of (ps_module, purl) tuples. + Parses a list of "ps_update_stream,purl" strings into a set of (ps_update_stream, purl) tuples. """ parsed_tuples = set() for item in tuples_list: parts = item.split(",", 1) # Split only on the first comma if len(parts) != 2: console.print( - f"Error: Invalid tuple format '{item}'. Expected 'ps_module,purl'.", + f"Error: Invalid tuple format '{item}'. Expected 'ps_update_stream,purl'.", style="error", ) sys.exit(1) - ps_module, purl = parts[0].strip(), parts[1].strip() - if not ps_module or not purl: + ps_update_stream, purl = parts[0].strip(), parts[1].strip() + if not ps_update_stream or not purl: console.print( - f"Error: ps_module or purl cannot be empty in '{item}'.", + f"Error: ps_update_stream or purl cannot be empty in '{item}'.", style="error", ) sys.exit(1) - parsed_tuples.add((ps_module, purl)) + parsed_tuples.add((ps_update_stream, purl)) return parsed_tuples @staticmethod @@ -53,7 +53,7 @@ def edit_tuples_in_editor( current_tuples: set[tuple[str, str]], ) -> set[tuple[str, str]]: """ - Opens the default text editor for the user to modify the ps_module/purl tuples. + Opens the default text editor for the user to modify the ps_update_stream/purl tuples. Returns the modified set of tuples. """ editor = os.environ.get("EDITOR", "vi") @@ -64,7 +64,9 @@ def edit_tuples_in_editor( temp_filepath = tf.name console.print(f"Opening editor '{editor}' for file: {temp_filepath}") - console.print("Please modify the ps_module,purl tuples and save the file.") + console.print( + "Please modify the ps_update_stream,purl tuples and save the file." + ) console.print("Each tuple should be on a new line.") try: @@ -91,18 +93,20 @@ def edit_tuples_in_editor( modified_lines = [ line.strip() for line in modified_content.splitlines() if line.strip() ] - return OSIDB.parse_module_purl_tuples(modified_lines) + return OSIDB.parse_stream_purl_tuples(modified_lines) def add_affects(self, flaw: Flaw, affects_to_add: set[tuple[str, str]]) -> None: console.print("Adding affects...") affects_data: list[dict[str, Any]] = [] for affect in affects_to_add: + ps_update_stream, purl = affect + osidb_affect = { "flaw": flaw.uuid, "embargoed": flaw.embargoed, - "ps_module": affect[0], + "ps_update_stream": ps_update_stream.strip(), "ps_component": None, - "purl": affect[1], + "purl": purl.strip(), } affects_data.append(osidb_affect) try: @@ -118,10 +122,10 @@ def add_affects(self, flaw: Flaw, affects_to_add: set[tuple[str, str]]) -> None: def edit_flaw_affects( self, flaw_id: str, - ps_module_purls: set[tuple[str, str]], + ps_stream_purls: set[tuple[str, str]], replace_mode: bool = False, ) -> None: - if not ps_module_purls: + if not ps_stream_purls: console.print("No new affects to add", style="warning") return @@ -136,10 +140,7 @@ def edit_flaw_affects( affects_by_state: dict[str, set[tuple[str, str]]] = defaultdict(set) for affect in flaw.affects: affects_by_state[affect.affectedness].add( - ( - affect.ps_module, - affect.purl, - ) + (affect.ps_update_stream, affect.purl) ) console.print("\n--- Existing Flaw Affects ---") @@ -153,16 +154,16 @@ def edit_flaw_affects( console.print("-----------------------------\n") console.print("New affects:") - for ps_module_purl in ps_module_purls: - console.print(ps_module_purl) + for ps_stream_purl in ps_stream_purls: + console.print(ps_stream_purl) # Optionally edit tuples in editor if click.confirm("Do you want to edit these affects?"): console.print("Entering editor mode to modify input tuples...") - ps_module_purls = self.edit_tuples_in_editor(ps_module_purls) + ps_stream_purls = self.edit_tuples_in_editor(ps_stream_purls) console.print("\n--- Modified Tuples from Editor ---") - if ps_module_purls: - for m, p in ps_module_purls: + if ps_stream_purls: + for m, p in ps_stream_purls: console.print(f" - {m},{p}") else: console.print(" (No tuples provided after editing)") @@ -170,11 +171,11 @@ def edit_flaw_affects( if not replace_mode: affects_to_add = ( - ps_module_purls - affects_by_state["NEW"] + ps_stream_purls - affects_by_state["NEW"] ) # Only truly new ones if not affects_to_add: console.print( - "No new ps_module/purl tuples to add. All provided are already present or in different states." + "No new ps_update_stream/purl tuples to add. All provided are already present or in different states." ) return @@ -187,7 +188,7 @@ def edit_flaw_affects( self.add_affects(flaw, affects_to_add) else: - if not affects_by_state["NEW"] and not ps_module_purls: + if not affects_by_state["NEW"] and not ps_stream_purls: console.print( "No existing 'NEW' affects to replace and no new affects provided. Nothing to do." ) @@ -202,8 +203,8 @@ def edit_flaw_affects( console.print("--------------------------------------------\n") console.print("\n--- New Affects that will REPLACE the above ---") - if ps_module_purls: - for affect in ps_module_purls: + if ps_stream_purls: + for affect in ps_stream_purls: console.print(affect) else: console.print(" (No new affects provided)") @@ -217,7 +218,7 @@ def edit_flaw_affects( existing_affects: dict[tuple[str, str], tuple[str, str]] = {} for affect in flaw.affects: if affect.purl: - existing_affects[(affect.ps_module, affect.purl)] = ( + existing_affects[(affect.ps_update_stream, affect.purl)] = ( affect.uuid, affect.affectedness, ) @@ -227,7 +228,7 @@ def edit_flaw_affects( existing_uuid, existing_affectedness = existing_value # Don't delete and re-add existing new affects if ( - existing_key not in ps_module_purls + existing_key not in ps_stream_purls and existing_affectedness == "NEW" ): try: @@ -241,4 +242,4 @@ def edit_flaw_affects( exit(1) # Add any new affects not already on the flaw in NEW state - self.add_affects(flaw, ps_module_purls) + self.add_affects(flaw, ps_stream_purls) diff --git a/src/trustshell/products.py b/src/trustshell/products.py index 1d07f0c..279ed63 100644 --- a/src/trustshell/products.py +++ b/src/trustshell/products.py @@ -22,7 +22,7 @@ paginated_trustify_query, ) from trustshell.osidb import OSIDB -from trustshell.product_definitions import ProdDefs, ProductModule +from trustshell.product_definitions import ProdDefs, ProductModule, ProductStream LATEST_ENDPOINT = f"{TRUSTIFY_URL}analysis/latest/component" ANALYSIS_ENDPOINT = f"{TRUSTIFY_URL}analysis/component" @@ -147,17 +147,36 @@ def _check_flaw(ctx: Any, param: Any, value: Any, dependent_option_name: str) -> def extract_affects(ancestor_trees: list[Node]) -> set[tuple[str, str]]: - """Collect all the leaf and root node tuples. The root node is the direct parent of the CPE. - The leaf node type should be ProductModule""" + """Collect all the leaf and root node tuples for OSIDB affects. + + Extracts (ps_update_stream, purl) tuples where: + - ps_update_stream comes from ProductStream parent of ProductModule leaf nodes + - purl comes from ancestor package components in the dependency tree + """ affects = set() + seen_streams = set() for tree in ancestor_trees: - ps_module_nodes = set() + ps_module_nodes = [] for leaf in tree.leaves: if isinstance(leaf, ProductModule): - ps_module_nodes.add(leaf) - if len(ps_module_nodes) > 1: - raise ValueError(f"More than one ProductModule found in {tree.root.name}") + ps_module_nodes.append(leaf) + for ps_module_node in ps_module_nodes: + # Extract ps_update_stream from ProductStream parent + if not ps_module_node.parent or not isinstance( + ps_module_node.parent, ProductStream + ): + logger.debug( + f"ProductModule {ps_module_node.name} has no ProductStream parent, skipping" + ) + continue + + ps_update_stream = ps_module_node.parent.name + + if ps_update_stream in seen_streams: + continue + seen_streams.add(ps_update_stream) + for ancestor in ps_module_node.ancestors: # Find the root component if ancestor.name.startswith("pkg:"): @@ -180,7 +199,7 @@ def extract_affects(ancestor_trees: list[Node]) -> set[tuple[str, str]]: affects.add( ( - ps_module_node.name, + ps_update_stream, purl.to_string(), ) ) diff --git a/tests/test_osidb.py b/tests/test_osidb.py index 57a1c45..87d2016 100644 --- a/tests/test_osidb.py +++ b/tests/test_osidb.py @@ -6,22 +6,22 @@ class TestOSIDB(unittest.TestCase): - def test_parse_module_purl_tuples(self): - input_list = ["ps_module1,purl1", "ps_module2,purl2"] - expected_output = {("ps_module1", "purl1"), ("ps_module2", "purl2")} - assert OSIDB.parse_module_purl_tuples(input_list) == expected_output + def test_parse_stream_purl_tuples(self): + input_list = ["rhel-9.4.z,purl1", "rhel-9.2.z,purl2"] + expected_output = {("rhel-9.4.z", "purl1"), ("rhel-9.2.z", "purl2")} + assert OSIDB.parse_stream_purl_tuples(input_list) == expected_output - def test_parse_module_purl_tuples_invalid_format(self): - input_list = ["ps_module1", "ps_module2,purl2"] + def test_parse_stream_purl_tuples_invalid_format(self): + input_list = ["rhel-9.4.z", "rhel-9.2.z,purl2"] with pytest.raises(SystemExit): - OSIDB.parse_module_purl_tuples(input_list) + OSIDB.parse_stream_purl_tuples(input_list) - def test_parse_module_purl_empty_ps_module(self): + def test_parse_stream_purl_empty_ps_update_stream(self): input_list = [",purl1"] with pytest.raises(SystemExit): - OSIDB.parse_module_purl_tuples(input_list) + OSIDB.parse_stream_purl_tuples(input_list) - def test_parse_module_purl_empty_purl(self): - input_list = ["ps_module1,"] + def test_parse_stream_purl_empty_purl(self): + input_list = ["rhel-9.4.z,"] with pytest.raises(SystemExit): - OSIDB.parse_module_purl_tuples(input_list) + OSIDB.parse_stream_purl_tuples(input_list) diff --git a/tests/test_products.py b/tests/test_products.py index d9da36e..cb7df05 100644 --- a/tests/test_products.py +++ b/tests/test_products.py @@ -422,14 +422,13 @@ def test_extract_affects_container_cdx(self, mock_service): affects = extract_affects(ancestor_trees) print(f"\nExtracted affects: {affects}") - # For now, just assert that we got some result - # The user will adjust assertions based on the printed output - assert affects == { - ( - "quay-3", - "pkg:oci/quay-builder-qemu-rhcos-rhel8?repository_url=registry.access.redhat.com/quay/quay-builder-qemu-rhcos-rhel8", + assert len(affects) == 2 + for affect in affects: + assert affect[0] in ["quay-3.12", "quay-3.13"] + assert ( + affect[1] + == "pkg:oci/quay-builder-qemu-rhcos-rhel8?repository_url=registry.access.redhat.com/quay/quay-builder-qemu-rhcos-rhel8" ) - } @patch("trustshell.product_definitions.ProdDefs.get_product_definitions_service") def test_extract_affects_container_cdx_no_cpes(self, mock_service): @@ -454,14 +453,13 @@ def test_extract_affects_container_cdx_no_cpes(self, mock_service): affects = extract_affects(ancestor_trees) print(f"\nExtracted affects: {affects}") - # For now, just assert that we got some result - # The user will adjust assertions based on the printed output - assert affects == { - ( - "quay-3", - "pkg:oci/quay-builder-qemu-rhcos-rhel8?repository_url=registry.access.redhat.com/quay/quay-builder-qemu-rhcos-rhel8", + assert len(affects) == 2 + for affect in affects: + assert affect[0] in ["quay-3.12", "quay-3.13"] + assert ( + affect[1] + == "pkg:oci/quay-builder-qemu-rhcos-rhel8?repository_url=registry.access.redhat.com/quay/quay-builder-qemu-rhcos-rhel8" ) - } @patch("trustshell.product_definitions.ProdDefs.get_product_definitions_service") def test_extract_affects_maven(self, mock_service): @@ -487,12 +485,10 @@ def test_extract_affects_maven(self, mock_service): print(f"\nExtracted affects: {affects}") # We expect the root level maven PURL, not the generic one - assert affects == { - ( - "quay-3", - "pkg:maven/io.quay/hey@1.2.3.redhat-00001?type=jar", - ) - } + assert len(affects) == 2 + for affect in affects: + assert affect[0] in ["quay-3.12", "quay-3.13"] # ps_update_stream + assert affect[1] == "pkg:maven/io.quay/hey@1.2.3.redhat-00001?type=jar" @patch("trustshell.product_definitions.ProdDefs.get_product_definitions_service") def test_no_duplicates_from_product_mappings(self, mock_service): From d0ca4d12b88c89d560c01b1375e9136a46912ffe Mon Sep 17 00:00:00 2001 From: jasinner Date: Mon, 3 Nov 2025 12:26:41 +1000 Subject: [PATCH 2/3] upgrade osidb-bindings --- uv.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/uv.lock b/uv.lock index ea7d359..b56452f 100644 --- a/uv.lock +++ b/uv.lock @@ -436,7 +436,7 @@ wheels = [ [[package]] name = "osidb-bindings" -version = "4.12.0" +version = "5.0.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiohttp" }, @@ -445,9 +445,9 @@ dependencies = [ { name = "requests" }, { name = "requests-gssapi" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/82/82/210a7128862f5e85204f417ac4ecee0d086c6da60868d5e34f0279cd19da/osidb_bindings-4.12.0.tar.gz", hash = "sha256:7e76308ea99410fcad01460c2094434ee13282884b8419c61c14c1c07370e1a6", size = 193905, upload-time = "2025-07-04T08:53:05.102Z" } +sdist = { url = "https://files.pythonhosted.org/packages/81/5b/3805555897d86a6c3fb786133cf3c9022357456e7c507882163097e18909/osidb_bindings-5.0.0.tar.gz", hash = "sha256:06d638ff447286f14fe3d39696fa5683cdbbdd00d022d7196105c147b161d124", size = 279354, upload-time = "2025-11-01T11:05:23.528Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/48/6a/12ff54948b5281770d5a38c611f149c0c6f45e3d68afcf314a5a0ddf4963/osidb_bindings-4.12.0-py3-none-any.whl", hash = "sha256:ae9eec8d76338e1a7975925b86bb207894b4ad988b9e4c4668ee517a30d7ed3a", size = 545303, upload-time = "2025-07-04T08:53:03.942Z" }, + { url = "https://files.pythonhosted.org/packages/b2/a6/7ed613d411428e89f9072b7311b73f1ecf97d578551e1c07433e2d873678/osidb_bindings-5.0.0-py3-none-any.whl", hash = "sha256:97762480ccd905f26ae629309083cace1845886897a37b8104050b67ef87d45e", size = 698227, upload-time = "2025-11-01T11:05:22.134Z" }, ] [[package]] @@ -712,7 +712,7 @@ wheels = [ [[package]] name = "trustshell" -version = "0.1.2" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "anytree" }, From eb6cd27bf1095b251350aec86c73a03c4c30516f Mon Sep 17 00:00:00 2001 From: jasinner Date: Mon, 3 Nov 2025 12:27:01 +1000 Subject: [PATCH 3/3] bump version 0.2.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 68b0199..f87212e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "trustshell" -version = "0.1.2" +version = "0.2.0" description = "Command Line tool for Trustify" readme = "README.md" authors = [