Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 96 additions & 19 deletions src/specify_cli/commands/bundle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,14 @@ def catalog_remove(
console.print(f"[green]✓[/green] Removed catalog source '{removed}'.")


# ZIP magic-byte signatures used to detect .zip payloads from REST API asset
# URLs, which carry no file extension. The three signatures cover all valid
# ZIP variants (PK\x03\x04 = local file header, PK\x05\x06 = empty archive,
# PK\x07\x08 = spanning marker) without the false-positive risk of checking
# only the 2-byte "PK" prefix.
_ZIP_SIGNATURES = (b"PK\x03\x04", b"PK\x05\x06", b"PK\x07\x08")


# ===== internal helpers =====


Expand Down Expand Up @@ -794,41 +802,110 @@ def _download_remote_manifest(entry_id: str, url: str):
"""Fetch a remote bundle artifact over HTTPS and extract its manifest."""
import io
import tempfile
from pathlib import PurePosixPath
from urllib.parse import urlparse as _urlparse

import yaml as _yaml

from ...authentication.http import open_url
from ...authentication.http import github_provider_hosts, open_url
from ..._github_http import resolve_github_release_asset_api_url
from ...bundler.models.manifest import BundleManifest

def _validate_redirect(old_url: str, new_url: str) -> None:
_require_https(f"bundle '{entry_id}'", new_url)

_require_https(f"bundle '{entry_id}'", url)

# For private/SSO-protected GitHub repos, browser release download URLs
# (https://github.com/<owner>/<repo>/releases/download/<tag>/<asset>)
# redirect to an HTML/SSO page instead of delivering the asset. Resolve
# such URLs to the GitHub REST API asset URL so the authenticated client
# can download the actual file.
extra_headers = None
effective_url = url
resolved = resolve_github_release_asset_api_url(
url, open_url, timeout=30, github_hosts=github_provider_hosts()
)
if resolved:
effective_url = resolved
_require_https(f"bundle '{entry_id}'", effective_url)
extra_headers = {"Accept": "application/octet-stream"}

# Human-readable description of where the bytes came from, reused across
# all post-download error messages so failures point at the catalog URL
# (and resolved API URL, if any) instead of an opaque temp path.
if effective_url != url:
_source_desc = f"{url} (resolved to {effective_url})"
else:
_source_desc = url

try:
with open_url(url, timeout=30, redirect_validator=_validate_redirect) as resp:
with open_url(
effective_url,
timeout=30,
redirect_validator=_validate_redirect,
extra_headers=extra_headers,
) as resp:
_require_https(f"bundle '{entry_id}'", resp.geturl())
raw = resp.read()
except BundlerError:
raise
except Exception as exc: # noqa: BLE001
raise BundlerError(f"Failed to download bundle '{entry_id}' from {url}: {exc}") from exc
# Report the original catalog URL so users know which entry to fix,
# and include the resolved URL when it differs for easier debugging.
raise BundlerError(
f"Failed to download bundle '{entry_id}' from {_source_desc}: {exc}"
) from exc

# A .zip artifact is written to a temp file and parsed via the local-source
# path (which extracts bundle.yml); any other payload is treated as YAML.
if url.lower().endswith(".zip"):
with tempfile.TemporaryDirectory() as tmp:
artifact = Path(tmp) / "bundle.zip"
artifact.write_bytes(raw)
manifest = _local_manifest_source(str(artifact))
if manifest is None:
raise BundlerError(
f"Downloaded artifact for bundle '{entry_id}' is not a valid bundle."
)
return manifest

import yaml as _yaml

from ...bundler.models.manifest import BundleManifest
# Detection uses the path component of the original catalog URL (via
# PurePosixPath so query strings and fragments are ignored, and URL paths
# are always treated as POSIX regardless of host OS), falling back to the
# module-level _ZIP_SIGNATURES magic-byte check for direct REST API asset
# URLs which carry no file extension.
_url_ext = PurePosixPath(_urlparse(url).path).suffix.lower()
try:
if _url_ext == ".zip" or raw[:4] in _ZIP_SIGNATURES:
with tempfile.TemporaryDirectory() as tmp:
artifact = Path(tmp) / "bundle.zip"
artifact.write_bytes(raw)
# Wrap ZIP parsing so any failure (BadZipFile, missing
# bundle.yml, etc.) references the source URL rather than the
# opaque temporary path, consistent with the download-error
# handling above.
try:
manifest = _local_manifest_source(str(artifact))
except Exception as exc: # noqa: BLE001
raise BundlerError(
f"Downloaded artifact for bundle '{entry_id}' from "
f"{_source_desc} is not a valid bundle: {exc}"
) from exc
# _local_manifest_source returns None only when the file does
# not exist; since we just wrote *artifact* that cannot happen
# here. The explicit guard ensures callers never receive None
# and silently degrade instead of raising a clear error.
if manifest is None:
raise BundlerError(
f"Downloaded artifact for bundle '{entry_id}' from "
f"{_source_desc} is not a valid bundle."
)
return manifest

data = _yaml.safe_load(io.BytesIO(raw))
return BundleManifest.from_dict(data)
data = _yaml.safe_load(io.BytesIO(raw))
return BundleManifest.from_dict(data)
except BundlerError:
raise
except _yaml.YAMLError as exc:
raise BundlerError(
f"Downloaded content for bundle '{entry_id}' from {_source_desc} "
f"is not valid YAML: {exc}"
) from exc
except Exception as exc: # noqa: BLE001
raise BundlerError(
f"Failed to parse downloaded bundle '{entry_id}' from "
f"{_source_desc}: {exc}"
) from exc


def register(app: typer.Typer) -> None:
Expand Down
Loading
Loading