Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 192 additions & 29 deletions sbomify_action/spdx3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
ProfileIdentifierType,
Relationship,
RelationshipType,
SoftwareAgent,
SpdxDocument,
Tool,
)
Expand All @@ -53,6 +54,20 @@

from .logging_config import logger


class Spdx3Payload(Payload):
"""Thin wrapper around :class:`Payload` that carries passthrough elements.

Elements whose ``type`` is not handled by the parser (e.g. security,
build, licensing types) are stored verbatim so the writer can
re-attach them to the output without data loss.
"""

def __init__(self) -> None:
super().__init__()
self.passthrough_elements: list[dict] = []


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
Expand All @@ -73,6 +88,26 @@
"software_Sbom": "Sbom",
}

# Reverse mapping: model class name → JSON-LD type prefix (e.g. "Package" → "software_Package")
_REVERSE_TYPE_ALIASES: dict[str, str] = {v: k for k, v in _TYPE_ALIASES.items()}

# Property names the spdx_tools converter outputs WITHOUT the software_ prefix,
# but the SPDX 3.0.1 JSON-LD context REQUIRES the prefix. Used by the writer
# to fix serialized output and by the parser to accept both forms.
# Maps: converter_name → context_name
_SOFTWARE_PROPERTY_RENAMES: dict[str, str] = {
"packageVersion": "software_packageVersion",
"downloadLocation": "software_downloadLocation",
"packageUrl": "software_packageUrl",
"homepage": "software_homePage",
"sourceInfo": "software_sourceInfo",
"copyrightText": "software_copyrightText",
"primaryPurpose": "software_primaryPurpose",
"additionalPurpose": "software_additionalPurpose",
"attributionText": "software_attributionText",
"contentIdentifier": "software_contentIdentifier",
}

# Map HashAlgorithm enum names (upper) to enum values
_HASH_ALGORITHMS: dict[str, HashAlgorithm] = {a.name.lower(): a for a in HashAlgorithm}

Expand Down Expand Up @@ -163,6 +198,20 @@ def extract_spdx3_version(data: dict) -> str | None:
# ---------------------------------------------------------------------------


def _get_sw(elem: dict, unprefixed: str, prefixed: str | None = None):
"""Return value from *elem* trying *unprefixed* then *software_*-prefixed key.

Needed because the spdx_tools converter writes ``packageVersion`` while
the SPDX 3.0.1 context defines ``software_packageVersion``. Real SBOMs
(Yocto / OpenEmbedded) use the spec-correct prefixed form.
"""
if unprefixed in elem:
return elem[unprefixed]
if prefixed is None:
prefixed = f"software_{unprefixed}"
return elem.get(prefixed)


def _parse_creation_info(ci_dict: dict) -> CreationInfo:
"""Parse a nested ``creationInfo`` dict into a :class:`CreationInfo`."""
spec_str = ci_dict.get("specVersion", "3.0.1")
Expand Down Expand Up @@ -300,16 +349,22 @@ def _parse_common_fields(elem: dict, creation_info_map: dict[str, CreationInfo]


def _parse_software_artifact_fields(elem: dict, fields: dict) -> None:
"""Parse fields specific to SoftwareArtifact subclasses (Package, File)."""
for json_key, py_key in [
"""Parse fields specific to SoftwareArtifact subclasses (Package, File).

Each software-profile property is looked up with both the unprefixed name
(as output by the spdx_tools converter) and the ``software_``-prefixed name
(as required by the SPDX 3.0.1 JSON-LD context and used by real tools).
"""
for unprefixed, py_key in [
("contentIdentifier", "content_identifier"),
("copyrightText", "copyright_text"),
("attributionText", "attribution_text"),
]:
if json_key in elem:
fields[py_key] = elem[json_key]
val = _get_sw(elem, unprefixed)
if val is not None:
fields[py_key] = val

# suppliedBy / originatedBy
# suppliedBy / originatedBy — Core properties, no software_ prefix needed
for json_key, py_key in [
("suppliedBy", "supplied_by"),
("originatedBy", "originated_by"),
Expand All @@ -319,7 +374,7 @@ def _parse_software_artifact_fields(elem: dict, fields: dict) -> None:
fields[py_key] = [val] if isinstance(val, str) else list(val)

# Primary purpose
pp = elem.get("primaryPurpose")
pp = _get_sw(elem, "primaryPurpose")
if pp:
pp_key = pp.lower()
if pp_key in _SW_PURPOSES:
Expand All @@ -328,13 +383,13 @@ def _parse_software_artifact_fields(elem: dict, fields: dict) -> None:
logger.warning("Unrecognized primaryPurpose value %r; omitting", pp)

# Additional purposes
aps = elem.get("additionalPurpose", [])
aps = _get_sw(elem, "additionalPurpose") or []
if isinstance(aps, str):
aps = [aps]
if aps:
fields["additional_purpose"] = [_SW_PURPOSES[a.lower()] for a in aps if a.lower() in _SW_PURPOSES]

# Dates
# Dates — Core properties, no software_ prefix needed
for json_key, py_key in [
("builtTime", "built_time"),
("releaseTime", "release_time"),
Expand All @@ -345,8 +400,8 @@ def _parse_software_artifact_fields(elem: dict, fields: dict) -> None:
date_str = date_str.replace("Z", "+00:00")
fields[py_key] = datetime.fromisoformat(date_str)

# Standards
stds = elem.get("standard", [])
# Standards — context uses "standardName", spdx_tools may output "standard"
stds = elem.get("standardName") or elem.get("standard", [])
Comment thread
vpetersson marked this conversation as resolved.
if isinstance(stds, str):
stds = [stds]
if stds:
Expand All @@ -358,16 +413,18 @@ def _parse_package(elem: dict, ci_map: dict[str, CreationInfo] | None = None) ->
fields = _parse_common_fields(elem, ci_map)
_parse_software_artifact_fields(elem, fields)

# Package-specific fields
for json_key, py_key in [
("packageVersion", "package_version"),
("downloadLocation", "download_location"),
("packageUrl", "package_url"),
("homepage", "homepage"),
("sourceInfo", "source_info"),
# Package-specific fields — accept both unprefixed and software_-prefixed
# Note: "homepage" maps to "software_homePage" in the context (different casing)
for unprefixed, prefixed, py_key in [
("packageVersion", "software_packageVersion", "package_version"),
("downloadLocation", "software_downloadLocation", "download_location"),
("packageUrl", "software_packageUrl", "package_url"),
("homepage", "software_homePage", "homepage"),
("sourceInfo", "software_sourceInfo", "source_info"),
]:
if json_key in elem:
fields[py_key] = elem[json_key]
val = _get_sw(elem, unprefixed, prefixed)
if val is not None:
fields[py_key] = val

# Ensure required 'name' field
if "name" not in fields:
Expand Down Expand Up @@ -429,14 +486,16 @@ def _parse_relationship(elem: dict, ci_map: dict[str, CreationInfo] | None = Non
return Relationship(**fields)


def _parse_agent(elem: dict, cls: type, ci_map: dict[str, CreationInfo] | None = None) -> Organization | Person | Tool:
"""Parse an Organization, Person, or Tool element."""
def _parse_agent(
elem: dict, cls: type, ci_map: dict[str, CreationInfo] | None = None
) -> Organization | Person | SoftwareAgent | Tool:
"""Parse an Organization, Person, SoftwareAgent, or Tool element."""
fields = _parse_common_fields(elem, ci_map)
return cls(**fields)


def parse_spdx3_file(file_path: str) -> Payload:
"""Parse an SPDX 3 JSON-LD file into a :class:`Payload`.
def parse_spdx3_file(file_path: str) -> Spdx3Payload:
"""Parse an SPDX 3 JSON-LD file into an :class:`Spdx3Payload`.

Maps ``@graph`` elements by their ``type`` (``@type``) to the
corresponding ``spdx_tools.spdx3.model`` classes.
Expand All @@ -445,7 +504,7 @@ def parse_spdx3_file(file_path: str) -> Payload:
file_path: Path to the SPDX 3 JSON-LD ``.json`` file.

Returns:
Populated :class:`Payload` with all parsed elements.
Populated :class:`Spdx3Payload` with all parsed elements.

Raises:
FileNotFoundError: If the file doesn't exist.
Expand All @@ -462,8 +521,8 @@ def parse_spdx3_file(file_path: str) -> Payload:
return parse_spdx3_data(data)


def parse_spdx3_data(data: dict) -> Payload:
"""Parse SPDX 3 JSON-LD data (already loaded) into a :class:`Payload`.
def parse_spdx3_data(data: dict) -> Spdx3Payload:
"""Parse SPDX 3 JSON-LD data (already loaded) into an :class:`Spdx3Payload`.

Handles both ``@graph``-based documents and top-level element documents
(where the root object itself is the element, without ``@graph``).
Expand All @@ -487,7 +546,7 @@ def parse_spdx3_data(data: dict) -> Payload:
ci_map[ci_id] = _parse_creation_info(elem)

# Second pass: parse all other elements
payload = Payload()
payload = Spdx3Payload()

for elem in graph:
if not isinstance(elem, dict):
Expand All @@ -510,15 +569,23 @@ def parse_spdx3_data(data: dict) -> Payload:
payload.add_element(_parse_agent(elem, Person, ci_map))
elif elem_type == "Tool":
payload.add_element(_parse_agent(elem, Tool, ci_map))
elif elem_type == "SoftwareAgent":
payload.add_element(_parse_agent(elem, SoftwareAgent, ci_map))
Comment thread
vpetersson marked this conversation as resolved.
elif elem_type == "Relationship":
payload.add_element(_parse_relationship(elem, ci_map))
elif elem_type == "CreationInfo":
pass # Already handled in first pass
# Already parsed in first pass; preserve standalone CreationInfo
# elements (those with @id) so passthrough elements referencing
# them by IRI string survive the roundtrip.
if elem.get("@id") or elem.get("spdxId"):
payload.passthrough_elements.append(elem)
else:
logger.debug(f"Skipping unknown SPDX 3 element type: {elem_type}")
logger.debug(f"Passing through unhandled SPDX 3 element type: {elem_type}")
payload.passthrough_elements.append(elem)
except Exception as e:
spdx_id = elem.get("@id") or elem.get("spdxId", "unknown")
logger.warning(f"Failed to parse SPDX 3 element {spdx_id} (type={elem_type}): {e}")
payload.passthrough_elements.append(elem)

return payload

Expand All @@ -528,6 +595,86 @@ def parse_spdx3_data(data: dict) -> Payload:
# ---------------------------------------------------------------------------


def _normalize_serialized_element(elem: dict) -> None:
"""Fix spdx_tools converter output to match SPDX 3.0.1 JSON-LD context.

Modifies *elem* **in place**:

1. ``@type`` → ``type``, ``@id`` → ``spdxId`` (context aliases)
2. Restore type prefixes: ``Package`` → ``software_Package``, etc.
3. Rename software-profile properties to their ``software_``-prefixed
form as required by the JSON-LD context.
4. Recurse into nested dicts (creationInfo, verifiedUsing, etc.)
"""
# --- key normalization: @type → type, @id → spdxId ---
if "@type" in elem:
elem["type"] = elem.pop("@type")
if "@id" in elem:
elem["spdxId"] = elem.pop("@id")

# --- type prefix restoration ---
etype = elem.get("type", "")
if etype in _REVERSE_TYPE_ALIASES:
elem["type"] = _REVERSE_TYPE_ALIASES[etype]

# --- software property prefix ---
for old_key, new_key in _SOFTWARE_PROPERTY_RENAMES.items():
if old_key in elem:
elem[new_key] = elem.pop(old_key)

# --- other property renames (non-software) ---
# spdx_tools outputs "standard" but context defines "standardName"
if "standard" in elem:
elem["standardName"] = elem.pop("standard")

# --- recurse into nested dicts / lists ---
for value in elem.values():
if isinstance(value, dict):
_normalize_nested_dict(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
_normalize_nested_dict(item)


def _normalize_nested_dict(d: dict) -> None:
"""Normalize ``@type`` → ``type`` in a nested dict (creationInfo, Hash, etc.)."""
if "@type" in d:
d["type"] = d.pop("@type")
# Recurse for deeper nesting (e.g. ExternalIdentifier inside verifiedUsing)
for value in d.values():
if isinstance(value, dict):
_normalize_nested_dict(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
_normalize_nested_dict(item)


def _normalize_passthrough_element(elem: dict) -> None:
"""Normalize keys on a passthrough element for output consistency.

Like :func:`_normalize_serialized_element` but only touches keys
(``@type`` → ``type``, ``@id`` → ``spdxId``). ``@id`` is only
renamed when the value is an IRI; blank-node identifiers (``_:…``)
are left as ``@id`` because ``spdxId`` must be an IRI per the spec.
"""
if "@type" in elem:
elem["type"] = elem.pop("@type")
if "@id" in elem:
id_val = elem["@id"]
if isinstance(id_val, str) and not id_val.startswith("_:"):
elem["spdxId"] = elem.pop("@id")
# Recurse into nested dicts (e.g. inline creationInfo, externalIdentifier)
for value in elem.values():
if isinstance(value, dict):
_normalize_nested_dict(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
_normalize_nested_dict(item)


def write_spdx3_file(
payload: Payload,
file_path: str,
Expand All @@ -545,6 +692,22 @@ def write_spdx3_file(
"""
element_list = convert_payload_to_json_ld_list_of_elements(payload)

# Post-process serialized elements to fix spdx_tools converter output:
# 1. Normalize @type→type, @id→spdxId (context aliases)
# 2. Restore type prefixes (e.g. "Package" → "software_Package")
# 3. Add software_ prefix to property names (e.g. "packageVersion" → "software_packageVersion")
for elem in element_list:
_normalize_serialized_element(elem)

# Re-attach passthrough elements that were not parsed into model objects.
# Normalize their keys for consistency, but preserve blank-node @id values
# (e.g. "_:CreationInfo0") since spdxId must be an IRI per the spec.
passthrough = payload.passthrough_elements if isinstance(payload, Spdx3Payload) else []
if passthrough:
Comment thread
vpetersson marked this conversation as resolved.
for elem in passthrough:
_normalize_passthrough_element(elem)
element_list.extend(passthrough)

complete_dict = {"@context": context_url, "@graph": element_list}

with open(file_path, "w", encoding="utf-8") as f:
Expand Down
Loading