From 1a6efd144dd0463cbb6121bac95cf27e0c19561c Mon Sep 17 00:00:00 2001 From: Soim Kim Date: Wed, 11 Feb 2026 10:55:01 +0900 Subject: [PATCH] Get wfp for getting kb data --- src/fosslight_source/_scan_item.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/fosslight_source/_scan_item.py b/src/fosslight_source/_scan_item.py index c943055..e9b293e 100644 --- a/src/fosslight_source/_scan_item.py +++ b/src/fosslight_source/_scan_item.py @@ -7,6 +7,7 @@ import logging import re import json +import base64 import hashlib import urllib.request import urllib.error @@ -74,7 +75,9 @@ def licenses(self, value: list) -> None: else: self._licenses = value - def _get_md5_hash(self, path_to_scan: str = "") -> str: + def _get_hash(self, path_to_scan: str = "") -> tuple: + wfp = "" + md5_hex = "" try: file_path = self.source_name_or_path if path_to_scan and not os.path.isabs(file_path): @@ -86,18 +89,26 @@ def _get_md5_hash(self, path_to_scan: str = "") -> str: with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): md5_hash.update(chunk) - return md5_hash.hexdigest() + md5_hex = md5_hash.hexdigest() + try: + from scanoss.winnowing import Winnowing + wfp = Winnowing().wfp_for_file(file_path, self.source_name_or_path) or "" + except Exception as e: + logger.debug(f"Failed to get WFP for {self.source_name_or_path}: {e}") except FileNotFoundError: logger.warning(f"File not found: {self.source_name_or_path}") except PermissionError: logger.warning(f"Permission denied: {self.source_name_or_path}") except Exception as e: logger.warning(f"Failed to compute MD5 for {self.source_name_or_path}: {e}") - return "" + return md5_hex, wfp - def _get_origin_url_from_md5_hash(self, md5_hash: str) -> str: + def _get_origin_url_from_md5_hash(self, md5_hash: str, wfp: str = "") -> str: try: - request = urllib.request.Request(KB_URL, data=json.dumps({"file_hash": md5_hash}).encode('utf-8'), method='POST') + payload = {"file_hash": md5_hash} + if wfp and wfp.strip(): + payload["wfp_base64"] = base64.b64encode(wfp.strip().encode("utf-8")).decode("ascii") + request = urllib.request.Request(KB_URL, data=json.dumps(payload).encode('utf-8'), method='POST') request.add_header('Accept', 'application/json') request.add_header('Content-Type', 'application/json') @@ -168,9 +179,9 @@ def set_oss_item(self, path_to_scan: str = "", run_kb: bool = False) -> None: else: item = OssItem(self.oss_name, self.oss_version, self.licenses) if run_kb and not self.is_license_text: - md5_hash = self._get_md5_hash(path_to_scan) + md5_hash, wfp = self._get_hash(path_to_scan) if md5_hash: - origin_url = self._get_origin_url_from_md5_hash(md5_hash) + origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp) if origin_url: extracted_name, extracted_version, repo_url = self._extract_oss_info_from_url(origin_url) if extracted_name: