Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "trustshell"
version = "0.1.2"
version = "0.2.0"
description = "Command Line tool for Trustify"
readme = "README.md"
authors = [
Expand Down
63 changes: 32 additions & 31 deletions src/trustshell/osidb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,35 @@ def __init__(self) -> None:
self.session = osidb_bindings.new_session(osidb_server_uri=endpoint) # type: ignore[attr-defined]

@staticmethod
def parse_module_purl_tuples(tuples_list: list[str]) -> set[tuple[str, str]]:
def parse_stream_purl_tuples(tuples_list: list[str]) -> set[tuple[str, str]]:
"""
Parses a list of "ps_module,purl" strings into a set of (ps_module, purl) tuples.
Parses a list of "ps_update_stream,purl" strings into a set of (ps_update_stream, purl) tuples.
"""
parsed_tuples = set()
for item in tuples_list:
parts = item.split(",", 1) # Split only on the first comma
if len(parts) != 2:
console.print(
f"Error: Invalid tuple format '{item}'. Expected 'ps_module,purl'.",
f"Error: Invalid tuple format '{item}'. Expected 'ps_update_stream,purl'.",
style="error",
)
sys.exit(1)
ps_module, purl = parts[0].strip(), parts[1].strip()
if not ps_module or not purl:
ps_update_stream, purl = parts[0].strip(), parts[1].strip()
if not ps_update_stream or not purl:
console.print(
f"Error: ps_module or purl cannot be empty in '{item}'.",
f"Error: ps_update_stream or purl cannot be empty in '{item}'.",
style="error",
)
sys.exit(1)
parsed_tuples.add((ps_module, purl))
parsed_tuples.add((ps_update_stream, purl))
return parsed_tuples

@staticmethod
def edit_tuples_in_editor(
current_tuples: set[tuple[str, str]],
) -> set[tuple[str, str]]:
"""
Opens the default text editor for the user to modify the ps_module/purl tuples.
Opens the default text editor for the user to modify the ps_update_stream/purl tuples.
Returns the modified set of tuples.
"""
editor = os.environ.get("EDITOR", "vi")
Expand All @@ -64,7 +64,9 @@ def edit_tuples_in_editor(
temp_filepath = tf.name

console.print(f"Opening editor '{editor}' for file: {temp_filepath}")
console.print("Please modify the ps_module,purl tuples and save the file.")
console.print(
"Please modify the ps_update_stream,purl tuples and save the file."
)
console.print("Each tuple should be on a new line.")

try:
Expand All @@ -91,18 +93,20 @@ def edit_tuples_in_editor(
modified_lines = [
line.strip() for line in modified_content.splitlines() if line.strip()
]
return OSIDB.parse_module_purl_tuples(modified_lines)
return OSIDB.parse_stream_purl_tuples(modified_lines)

def add_affects(self, flaw: Flaw, affects_to_add: set[tuple[str, str]]) -> None:
console.print("Adding affects...")
affects_data: list[dict[str, Any]] = []
for affect in affects_to_add:
ps_update_stream, purl = affect

osidb_affect = {
"flaw": flaw.uuid,
"embargoed": flaw.embargoed,
"ps_module": affect[0],
"ps_update_stream": ps_update_stream.strip(),
"ps_component": None,
"purl": affect[1],
"purl": purl.strip(),
}
affects_data.append(osidb_affect)
try:
Expand All @@ -118,10 +122,10 @@ def add_affects(self, flaw: Flaw, affects_to_add: set[tuple[str, str]]) -> None:
def edit_flaw_affects(
self,
flaw_id: str,
ps_module_purls: set[tuple[str, str]],
ps_stream_purls: set[tuple[str, str]],
replace_mode: bool = False,
) -> None:
if not ps_module_purls:
if not ps_stream_purls:
console.print("No new affects to add", style="warning")
return

Expand All @@ -136,10 +140,7 @@ def edit_flaw_affects(
affects_by_state: dict[str, set[tuple[str, str]]] = defaultdict(set)
for affect in flaw.affects:
affects_by_state[affect.affectedness].add(
(
affect.ps_module,
affect.purl,
)
(affect.ps_update_stream, affect.purl)
)

console.print("\n--- Existing Flaw Affects ---")
Expand All @@ -153,28 +154,28 @@ def edit_flaw_affects(
console.print("-----------------------------\n")

console.print("New affects:")
for ps_module_purl in ps_module_purls:
console.print(ps_module_purl)
for ps_stream_purl in ps_stream_purls:
console.print(ps_stream_purl)

# Optionally edit tuples in editor
if click.confirm("Do you want to edit these affects?"):
console.print("Entering editor mode to modify input tuples...")
ps_module_purls = self.edit_tuples_in_editor(ps_module_purls)
ps_stream_purls = self.edit_tuples_in_editor(ps_stream_purls)
console.print("\n--- Modified Tuples from Editor ---")
if ps_module_purls:
for m, p in ps_module_purls:
if ps_stream_purls:
for m, p in ps_stream_purls:
console.print(f" - {m},{p}")
else:
console.print(" (No tuples provided after editing)")
console.print("-----------------------------------\n")

if not replace_mode:
affects_to_add = (
ps_module_purls - affects_by_state["NEW"]
ps_stream_purls - affects_by_state["NEW"]
) # Only truly new ones
if not affects_to_add:
console.print(
"No new ps_module/purl tuples to add. All provided are already present or in different states."
"No new ps_update_stream/purl tuples to add. All provided are already present or in different states."
)
return

Expand All @@ -187,7 +188,7 @@ def edit_flaw_affects(
self.add_affects(flaw, affects_to_add)

else:
if not affects_by_state["NEW"] and not ps_module_purls:
if not affects_by_state["NEW"] and not ps_stream_purls:
console.print(
"No existing 'NEW' affects to replace and no new affects provided. Nothing to do."
)
Expand All @@ -202,8 +203,8 @@ def edit_flaw_affects(
console.print("--------------------------------------------\n")

console.print("\n--- New Affects that will REPLACE the above ---")
if ps_module_purls:
for affect in ps_module_purls:
if ps_stream_purls:
for affect in ps_stream_purls:
console.print(affect)
else:
console.print(" (No new affects provided)")
Expand All @@ -217,7 +218,7 @@ def edit_flaw_affects(
existing_affects: dict[tuple[str, str], tuple[str, str]] = {}
for affect in flaw.affects:
if affect.purl:
existing_affects[(affect.ps_module, affect.purl)] = (
existing_affects[(affect.ps_update_stream, affect.purl)] = (
affect.uuid,
affect.affectedness,
)
Expand All @@ -227,7 +228,7 @@ def edit_flaw_affects(
existing_uuid, existing_affectedness = existing_value
# Don't delete and re-add existing new affects
if (
existing_key not in ps_module_purls
existing_key not in ps_stream_purls
and existing_affectedness == "NEW"
):
try:
Expand All @@ -241,4 +242,4 @@ def edit_flaw_affects(
exit(1)

# Add any new affects not already on the flaw in NEW state
self.add_affects(flaw, ps_module_purls)
self.add_affects(flaw, ps_stream_purls)
35 changes: 27 additions & 8 deletions src/trustshell/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
paginated_trustify_query,
)
from trustshell.osidb import OSIDB
from trustshell.product_definitions import ProdDefs, ProductModule
from trustshell.product_definitions import ProdDefs, ProductModule, ProductStream

LATEST_ENDPOINT = f"{TRUSTIFY_URL}analysis/latest/component"
ANALYSIS_ENDPOINT = f"{TRUSTIFY_URL}analysis/component"
Expand Down Expand Up @@ -147,17 +147,36 @@ def _check_flaw(ctx: Any, param: Any, value: Any, dependent_option_name: str) ->


def extract_affects(ancestor_trees: list[Node]) -> set[tuple[str, str]]:
"""Collect all the leaf and root node tuples. The root node is the direct parent of the CPE.
The leaf node type should be ProductModule"""
"""Collect all the leaf and root node tuples for OSIDB affects.

Extracts (ps_update_stream, purl) tuples where:
- ps_update_stream comes from ProductStream parent of ProductModule leaf nodes
- purl comes from ancestor package components in the dependency tree
"""
affects = set()
seen_streams = set()
for tree in ancestor_trees:
ps_module_nodes = set()
ps_module_nodes = []
for leaf in tree.leaves:
if isinstance(leaf, ProductModule):
ps_module_nodes.add(leaf)
if len(ps_module_nodes) > 1:
raise ValueError(f"More than one ProductModule found in {tree.root.name}")
ps_module_nodes.append(leaf)

for ps_module_node in ps_module_nodes:
# Extract ps_update_stream from ProductStream parent
if not ps_module_node.parent or not isinstance(
ps_module_node.parent, ProductStream
):
logger.debug(
f"ProductModule {ps_module_node.name} has no ProductStream parent, skipping"
)
continue

ps_update_stream = ps_module_node.parent.name

if ps_update_stream in seen_streams:
continue
seen_streams.add(ps_update_stream)

for ancestor in ps_module_node.ancestors:
# Find the root component
if ancestor.name.startswith("pkg:"):
Expand All @@ -180,7 +199,7 @@ def extract_affects(ancestor_trees: list[Node]) -> set[tuple[str, str]]:

affects.add(
(
ps_module_node.name,
ps_update_stream,
purl.to_string(),
)
)
Expand Down
24 changes: 12 additions & 12 deletions tests/test_osidb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@


class TestOSIDB(unittest.TestCase):
def test_parse_module_purl_tuples(self):
input_list = ["ps_module1,purl1", "ps_module2,purl2"]
expected_output = {("ps_module1", "purl1"), ("ps_module2", "purl2")}
assert OSIDB.parse_module_purl_tuples(input_list) == expected_output
def test_parse_stream_purl_tuples(self):
input_list = ["rhel-9.4.z,purl1", "rhel-9.2.z,purl2"]
expected_output = {("rhel-9.4.z", "purl1"), ("rhel-9.2.z", "purl2")}
assert OSIDB.parse_stream_purl_tuples(input_list) == expected_output

def test_parse_module_purl_tuples_invalid_format(self):
input_list = ["ps_module1", "ps_module2,purl2"]
def test_parse_stream_purl_tuples_invalid_format(self):
input_list = ["rhel-9.4.z", "rhel-9.2.z,purl2"]
with pytest.raises(SystemExit):
OSIDB.parse_module_purl_tuples(input_list)
OSIDB.parse_stream_purl_tuples(input_list)

def test_parse_module_purl_empty_ps_module(self):
def test_parse_stream_purl_empty_ps_update_stream(self):
input_list = [",purl1"]
with pytest.raises(SystemExit):
OSIDB.parse_module_purl_tuples(input_list)
OSIDB.parse_stream_purl_tuples(input_list)

def test_parse_module_purl_empty_purl(self):
input_list = ["ps_module1,"]
def test_parse_stream_purl_empty_purl(self):
input_list = ["rhel-9.4.z,"]
with pytest.raises(SystemExit):
OSIDB.parse_module_purl_tuples(input_list)
OSIDB.parse_stream_purl_tuples(input_list)
36 changes: 16 additions & 20 deletions tests/test_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,14 +422,13 @@ def test_extract_affects_container_cdx(self, mock_service):
affects = extract_affects(ancestor_trees)
print(f"\nExtracted affects: {affects}")

# For now, just assert that we got some result
# The user will adjust assertions based on the printed output
assert affects == {
(
"quay-3",
"pkg:oci/quay-builder-qemu-rhcos-rhel8?repository_url=registry.access.redhat.com/quay/quay-builder-qemu-rhcos-rhel8",
assert len(affects) == 2
for affect in affects:
assert affect[0] in ["quay-3.12", "quay-3.13"]
assert (
affect[1]
== "pkg:oci/quay-builder-qemu-rhcos-rhel8?repository_url=registry.access.redhat.com/quay/quay-builder-qemu-rhcos-rhel8"
)
}

@patch("trustshell.product_definitions.ProdDefs.get_product_definitions_service")
def test_extract_affects_container_cdx_no_cpes(self, mock_service):
Expand All @@ -454,14 +453,13 @@ def test_extract_affects_container_cdx_no_cpes(self, mock_service):
affects = extract_affects(ancestor_trees)
print(f"\nExtracted affects: {affects}")

# For now, just assert that we got some result
# The user will adjust assertions based on the printed output
assert affects == {
(
"quay-3",
"pkg:oci/quay-builder-qemu-rhcos-rhel8?repository_url=registry.access.redhat.com/quay/quay-builder-qemu-rhcos-rhel8",
assert len(affects) == 2
for affect in affects:
assert affect[0] in ["quay-3.12", "quay-3.13"]
assert (
affect[1]
== "pkg:oci/quay-builder-qemu-rhcos-rhel8?repository_url=registry.access.redhat.com/quay/quay-builder-qemu-rhcos-rhel8"
)
}

@patch("trustshell.product_definitions.ProdDefs.get_product_definitions_service")
def test_extract_affects_maven(self, mock_service):
Expand All @@ -487,12 +485,10 @@ def test_extract_affects_maven(self, mock_service):
print(f"\nExtracted affects: {affects}")

# We expect the root level maven PURL, not the generic one
assert affects == {
(
"quay-3",
"pkg:maven/io.quay/hey@1.2.3.redhat-00001?type=jar",
)
}
assert len(affects) == 2
for affect in affects:
assert affect[0] in ["quay-3.12", "quay-3.13"] # ps_update_stream
assert affect[1] == "pkg:maven/io.quay/hey@1.2.3.redhat-00001?type=jar"

@patch("trustshell.product_definitions.ProdDefs.get_product_definitions_service")
def test_no_duplicates_from_product_mappings(self, mock_service):
Expand Down
8 changes: 4 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading