Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/fosslight_source/_scan_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import re
import json
import base64
import hashlib
import urllib.request
import urllib.error
Expand Down Expand Up @@ -74,7 +75,9 @@ def licenses(self, value: list) -> None:
else:
self._licenses = value

def _get_md5_hash(self, path_to_scan: str = "") -> str:
def _get_hash(self, path_to_scan: str = "") -> tuple:
wfp = ""
md5_hex = ""
try:
file_path = self.source_name_or_path
if path_to_scan and not os.path.isabs(file_path):
Expand All @@ -86,18 +89,26 @@ def _get_md5_hash(self, path_to_scan: str = "") -> str:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
md5_hex = md5_hash.hexdigest()
try:
from scanoss.winnowing import Winnowing
wfp = Winnowing().wfp_for_file(file_path, self.source_name_or_path) or ""
except Exception as e:
logger.debug(f"Failed to get WFP for {self.source_name_or_path}: {e}")
except FileNotFoundError:
logger.warning(f"File not found: {self.source_name_or_path}")
except PermissionError:
logger.warning(f"Permission denied: {self.source_name_or_path}")
except Exception as e:
logger.warning(f"Failed to compute MD5 for {self.source_name_or_path}: {e}")
return ""
return md5_hex, wfp

def _get_origin_url_from_md5_hash(self, md5_hash: str) -> str:
def _get_origin_url_from_md5_hash(self, md5_hash: str, wfp: str = "") -> str:
try:
request = urllib.request.Request(KB_URL, data=json.dumps({"file_hash": md5_hash}).encode('utf-8'), method='POST')
payload = {"file_hash": md5_hash}
if wfp and wfp.strip():
payload["wfp_base64"] = base64.b64encode(wfp.strip().encode("utf-8")).decode("ascii")
request = urllib.request.Request(KB_URL, data=json.dumps(payload).encode('utf-8'), method='POST')
request.add_header('Accept', 'application/json')
request.add_header('Content-Type', 'application/json')

Expand Down Expand Up @@ -168,9 +179,9 @@ def set_oss_item(self, path_to_scan: str = "", run_kb: bool = False) -> None:
else:
item = OssItem(self.oss_name, self.oss_version, self.licenses)
if run_kb and not self.is_license_text:
md5_hash = self._get_md5_hash(path_to_scan)
md5_hash, wfp = self._get_hash(path_to_scan)
if md5_hash:
origin_url = self._get_origin_url_from_md5_hash(md5_hash)
origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp)
if origin_url:
extracted_name, extracted_version, repo_url = self._extract_oss_info_from_url(origin_url)
if extracted_name:
Expand Down
Loading