diff --git a/tests/cli/integration/test_init_command.py b/tests/cli/integration/test_init_command.py
index 22d7225..10b4bdd 100644
--- a/tests/cli/integration/test_init_command.py
+++ b/tests/cli/integration/test_init_command.py
@@ -76,3 +76,53 @@ def test_init_non_interactive_requires_context_fields(runner, monkeypatch, tmp_p
],
)
assert result.exit_code == 2
+
+
+def test_init_interactive_respects_provided_options_and_keeps_active_profile(
+ runner, monkeypatch, tmp_path
+) -> None:
+ config_path = tmp_path / "config.json"
+ monkeypatch.setattr(paths, "config_file", lambda: config_path)
+
+ seed_result = runner.invoke(
+ app,
+ [
+ "--json",
+ "init",
+ "--name",
+ "seed",
+ "--env",
+ "DEMO",
+ "--context-type",
+ "nip",
+ "--context-value",
+ "111",
+ "--non-interactive",
+ "--set-active",
+ ],
+ )
+ assert seed_result.exit_code == 0
+
+ result = runner.invoke(
+ app,
+ [
+ "--json",
+ "init",
+ "--name",
+ "second",
+ "--env",
+ "TEST",
+ "--base-url",
+ "https://example.test",
+ "--context-type",
+ "nip",
+ "--context-value",
+ "222",
+ ],
+ input="second\n",
+ )
+ assert result.exit_code == 0
+ payload = _json_output(result.stdout)
+ assert payload["ok"] is True
+ assert payload["profile"] == "second"
+ assert payload["data"]["active_profile"] == "seed"
diff --git a/tests/cli/unit/test_auth_manager.py b/tests/cli/unit/test_auth_manager.py
index 354f07e..acefce3 100644
--- a/tests/cli/unit/test_auth_manager.py
+++ b/tests/cli/unit/test_auth_manager.py
@@ -180,6 +180,15 @@ def test_status_and_logout(monkeypatch) -> None:
def test_select_certificate_and_require_non_empty_errors() -> None:
+ cert = manager._select_certificate(
+ [
+ {"usage": ["KsefTokenEncryption"], "certificate": ""},
+ {"usage": ["KsefTokenEncryption"], "certificate": "CERT"},
+ ],
+ "KsefTokenEncryption",
+ )
+ assert cert == "CERT"
+
with pytest.raises(CliError) as cert_error:
manager._select_certificate([], "KsefTokenEncryption")
assert cert_error.value.code == ExitCode.API_ERROR
@@ -214,10 +223,31 @@ def test_resolve_base_url_uses_profile_when_missing(monkeypatch) -> None:
assert manager.resolve_base_url(None, profile="demo") == "https://profile.example"
+def test_resolve_base_url_falls_back_when_profile_base_url_empty(monkeypatch) -> None:
+ monkeypatch.setattr(
+ manager,
+ "load_config",
+ lambda: CliConfig(
+ active_profile="demo",
+ profiles={
+ "demo": ProfileConfig(
+ name="demo",
+ env="DEMO",
+ base_url=" ",
+ context_type="nip",
+ context_value="123",
+ )
+ },
+ ),
+ )
+ assert manager.resolve_base_url(None, profile="demo") == manager.KsefEnvironment.DEMO.value
+
+
def test_resolve_lighthouse_base_url_prefers_explicit_value() -> None:
- assert manager.resolve_lighthouse_base_url(" https://api-latarnia-test.ksef.mf.gov.pl/ ") == (
- "https://api-latarnia-test.ksef.mf.gov.pl/"
- ).strip()
+ assert (
+ manager.resolve_lighthouse_base_url(" https://api-latarnia-test.ksef.mf.gov.pl/ ")
+ == ("https://api-latarnia-test.ksef.mf.gov.pl/").strip()
+ )
def test_resolve_lighthouse_base_url_uses_profile_mapping(monkeypatch) -> None:
@@ -269,6 +299,28 @@ def test_resolve_lighthouse_base_url_invalid_profile_base_fallback(monkeypatch)
)
+def test_resolve_lighthouse_base_url_fallback_when_profile_base_url_empty(monkeypatch) -> None:
+ monkeypatch.setattr(
+ manager,
+ "load_config",
+ lambda: CliConfig(
+ active_profile="demo",
+ profiles={
+ "demo": ProfileConfig(
+ name="demo",
+ env="DEMO",
+ base_url=" ",
+ context_type="nip",
+ context_value="123",
+ )
+ },
+ ),
+ )
+ assert manager.resolve_lighthouse_base_url(None, profile="demo") == (
+ KsefLighthouseEnvironment.TEST.value
+ )
+
+
def test_refresh_access_token_missing_token_in_response(monkeypatch) -> None:
class _FakeClientNoToken:
def __init__(self) -> None:
@@ -492,3 +544,60 @@ def test_login_with_xades_loader_errors_are_mapped(monkeypatch) -> None:
save=False,
)
assert exc.value.code == ExitCode.VALIDATION_ERROR
+
+
+def test_login_with_token_without_save(monkeypatch) -> None:
+ monkeypatch.setattr(manager, "create_client", lambda base_url: _FakeClient())
+ monkeypatch.setattr(manager, "AuthCoordinator", _FakeAuthCoordinator)
+ monkeypatch.setattr(
+ manager,
+ "save_tokens",
+ lambda *args, **kwargs: (_ for _ in ()).throw(
+ AssertionError("save_tokens should not be called")
+ ),
+ )
+ monkeypatch.setattr(
+ manager,
+ "set_cached_metadata",
+ lambda *args, **kwargs: (_ for _ in ()).throw(
+ AssertionError("set_cached_metadata should not be called")
+ ),
+ )
+
+ result = manager.login_with_token(
+ profile="demo",
+ base_url="https://api-demo.ksef.mf.gov.pl",
+ token="TOKEN",
+ context_type="nip",
+ context_value="5265877635",
+ poll_interval=0.0,
+ max_attempts=1,
+ save=False,
+ )
+ assert result["saved"] is False
+
+
+def test_refresh_access_token_success_without_save(monkeypatch) -> None:
+ monkeypatch.setattr(manager, "get_tokens", lambda profile: ("acc", "ref"))
+ monkeypatch.setattr(manager, "create_client", lambda base_url: _FakeClient())
+ monkeypatch.setattr(
+ manager,
+ "save_tokens",
+ lambda *args, **kwargs: (_ for _ in ()).throw(
+ AssertionError("save_tokens should not be called")
+ ),
+ )
+ monkeypatch.setattr(
+ manager,
+ "set_cached_metadata",
+ lambda *args, **kwargs: (_ for _ in ()).throw(
+ AssertionError("set_cached_metadata should not be called")
+ ),
+ )
+
+ result = manager.refresh_access_token(
+ profile="demo",
+ base_url="https://api-demo.ksef.mf.gov.pl",
+ save=False,
+ )
+ assert result["saved"] is False
diff --git a/tests/cli/unit/test_config_loader_profiles.py b/tests/cli/unit/test_config_loader_profiles.py
index 9d8dd2e..d6549c4 100644
--- a/tests/cli/unit/test_config_loader_profiles.py
+++ b/tests/cli/unit/test_config_loader_profiles.py
@@ -82,6 +82,18 @@ def test_loader_profile_parser_skips_invalid_entries(monkeypatch, tmp_path) -> N
assert loader.load_config().profiles == {}
+def test_loader_accepts_non_dict_profiles_key(monkeypatch, tmp_path) -> None:
+ config_path = tmp_path / "config.json"
+ monkeypatch.setattr(paths, "config_file", lambda: config_path)
+ config_path.write_text(
+ json.dumps({"active_profile": "demo", "profiles": []}),
+ encoding="utf-8",
+ )
+ loaded = loader.load_config()
+ assert loaded.profiles == {}
+ assert loaded.active_profile is None
+
+
def test_loader_skips_non_string_profile_names(monkeypatch, tmp_path) -> None:
config_path = tmp_path / "config.json"
monkeypatch.setattr(paths, "config_file", lambda: config_path)
@@ -293,3 +305,27 @@ def test_profiles_upsert_and_active_fallback() -> None:
profiles.delete_profile(config, name="one")
assert config.active_profile == "two"
+
+
+def test_profiles_delete_non_active_profile_keeps_active() -> None:
+ config = CliConfig(
+ active_profile="one",
+ profiles={
+ "one": ProfileConfig(
+ name="one",
+ env="DEMO",
+ base_url="https://api-demo.ksef.mf.gov.pl",
+ context_type="nip",
+ context_value="1",
+ ),
+ "two": ProfileConfig(
+ name="two",
+ env="TEST",
+ base_url="https://api-test.ksef.mf.gov.pl",
+ context_type="nip",
+ context_value="2",
+ ),
+ },
+ )
+ profiles.delete_profile(config, name="two")
+ assert config.active_profile == "one"
diff --git a/tests/cli/unit/test_core_coverage.py b/tests/cli/unit/test_core_coverage.py
index 871e867..8d00ec6 100644
--- a/tests/cli/unit/test_core_coverage.py
+++ b/tests/cli/unit/test_core_coverage.py
@@ -44,6 +44,18 @@ def _fake_entrypoint() -> None:
assert called["value"] is True
+def test_cli_main_module_import_does_not_invoke_entrypoint(monkeypatch) -> None:
+ called = {"value": False}
+
+ def _fake_entrypoint() -> None:
+ called["value"] = True
+
+ monkeypatch.setattr(app_module, "app_entrypoint", _fake_entrypoint)
+ main_module = importlib.import_module("ksef_client.cli.__main__")
+ importlib.reload(main_module)
+ assert called["value"] is False
+
+
def test_config_loader_and_profiles(monkeypatch, tmp_path: Path) -> None:
monkeypatch.setattr(paths, "config_file", lambda: tmp_path / "config.json")
cfg = loader.load_config()
diff --git a/tests/cli/unit/test_output_human.py b/tests/cli/unit/test_output_human.py
index cab509d..175acf6 100644
--- a/tests/cli/unit/test_output_human.py
+++ b/tests/cli/unit/test_output_human.py
@@ -61,6 +61,14 @@ def test_human_renderer_success_skips_raw_response_payload(capsys) -> None:
assert "raw" not in out
+def test_human_renderer_success_without_data(capsys) -> None:
+ renderer = HumanRenderer(no_color=True)
+ renderer.success(command="send.status", profile="demo", data=None)
+ out = capsys.readouterr().out
+ assert "OK" in out
+ assert "send.status" in out
+
+
def test_human_renderer_error_prints_hint(capsys) -> None:
renderer = HumanRenderer(no_color=True)
renderer.error(
diff --git a/tests/cli/unit/test_sdk_adapters.py b/tests/cli/unit/test_sdk_adapters.py
index 7bcc4d9..e82f4c7 100644
--- a/tests/cli/unit/test_sdk_adapters.py
+++ b/tests/cli/unit/test_sdk_adapters.py
@@ -2187,3 +2187,392 @@ def get_public_key_certificates(self):
check_certs=False,
)
assert fail["overall"] == "FAIL"
+
+
+def test_build_zip_from_directory_skips_non_file_xml_paths(tmp_path) -> None:
+ (tmp_path / "nested").mkdir()
+ (tmp_path / "nested" / "invoice.xml").mkdir()
+ with pytest.raises(CliError) as exc:
+ adapters._build_zip_from_directory(str(tmp_path))
+ assert exc.value.code == ExitCode.VALIDATION_ERROR
+
+
+def test_wait_helpers_include_details_in_error_hints(monkeypatch) -> None:
+ monkeypatch.setattr(adapters.time, "sleep", lambda _: None)
+
+ class _InvoiceStatusClient:
+ class sessions:
+ @staticmethod
+ def get_session_invoice_status(session_ref, invoice_ref, access_token):
+ _ = (session_ref, invoice_ref, access_token)
+ return {"status": {"code": 400, "description": "bad", "details": ["d1"]}}
+
+ class _SessionStatusClient:
+ class sessions:
+ @staticmethod
+ def get_session_status(session_ref, access_token):
+ _ = (session_ref, access_token)
+ return {"status": {"code": 400, "description": "bad", "details": ["d2"]}}
+
+ class _ExportStatusClient:
+ class invoices:
+ @staticmethod
+ def get_export_status(reference_number, access_token):
+ _ = (reference_number, access_token)
+ return {"status": {"code": 400, "description": "bad", "details": ["d3"]}}
+
+ with pytest.raises(CliError) as invoice_exc:
+ adapters._wait_for_invoice_status(
+ client=_InvoiceStatusClient(),
+ session_ref="S",
+ invoice_ref="I",
+ access_token="acc",
+ poll_interval=0.01,
+ max_attempts=1,
+ )
+ assert "Details: d1" in (invoice_exc.value.hint or "")
+
+ with pytest.raises(CliError) as session_exc:
+ adapters._wait_for_session_status(
+ client=_SessionStatusClient(),
+ session_ref="S",
+ access_token="acc",
+ poll_interval=0.01,
+ max_attempts=1,
+ )
+ assert "Details: d2" in (session_exc.value.hint or "")
+
+ with pytest.raises(CliError) as export_exc:
+ adapters._wait_for_export_status(
+ client=_ExportStatusClient(),
+ reference_number="R",
+ access_token="acc",
+ poll_interval=0.01,
+ max_attempts=1,
+ )
+ assert "Details: d3" in (export_exc.value.hint or "")
+
+
+def test_wait_helpers_error_hints_without_details(monkeypatch) -> None:
+ monkeypatch.setattr(adapters.time, "sleep", lambda _: None)
+
+ class _InvoiceStatusClient:
+ class sessions:
+ @staticmethod
+ def get_session_invoice_status(session_ref, invoice_ref, access_token):
+ _ = (session_ref, invoice_ref, access_token)
+ return {"status": {"code": 400, "description": "bad"}}
+
+ class _SessionStatusClient:
+ class sessions:
+ @staticmethod
+ def get_session_status(session_ref, access_token):
+ _ = (session_ref, access_token)
+ return {"status": {"code": 400, "description": "bad"}}
+
+ class _ExportStatusClient:
+ class invoices:
+ @staticmethod
+ def get_export_status(reference_number, access_token):
+ _ = (reference_number, access_token)
+ return {"status": {"code": 400, "description": "bad"}}
+
+ with pytest.raises(CliError) as invoice_exc:
+ adapters._wait_for_invoice_status(
+ client=_InvoiceStatusClient(),
+ session_ref="S",
+ invoice_ref="I",
+ access_token="acc",
+ poll_interval=0.01,
+ max_attempts=1,
+ )
+ assert invoice_exc.value.hint == "bad"
+
+ with pytest.raises(CliError) as session_exc:
+ adapters._wait_for_session_status(
+ client=_SessionStatusClient(),
+ session_ref="S",
+ access_token="acc",
+ poll_interval=0.01,
+ max_attempts=1,
+ )
+ assert session_exc.value.hint == "bad"
+
+ with pytest.raises(CliError) as export_exc:
+ adapters._wait_for_export_status(
+ client=_ExportStatusClient(),
+ reference_number="R",
+ access_token="acc",
+ poll_interval=0.01,
+ max_attempts=1,
+ )
+ assert export_exc.value.hint == "bad"
+
+
+def test_wait_helpers_retry_on_empty_upo_bytes(monkeypatch) -> None:
+ monkeypatch.setattr(adapters.time, "sleep", lambda _: None)
+
+ class _InvoiceUpoClient:
+ class sessions:
+ @staticmethod
+ def get_session_invoice_upo_by_ref(session_ref, invoice_ref, access_token):
+ _ = (session_ref, invoice_ref, access_token)
+ return b""
+
+ class _BatchUpoClient:
+ class sessions:
+ @staticmethod
+ def get_session_upo(session_ref, upo_ref, access_token):
+ _ = (session_ref, upo_ref, access_token)
+ return b""
+
+ with pytest.raises(CliError) as invoice_exc:
+ adapters._wait_for_invoice_upo(
+ client=_InvoiceUpoClient(),
+ session_ref="S",
+ invoice_ref="I",
+ access_token="acc",
+ poll_interval=0.01,
+ max_attempts=1,
+ )
+ assert invoice_exc.value.code == ExitCode.RETRY_EXHAUSTED
+
+ with pytest.raises(CliError) as batch_exc:
+ adapters._wait_for_batch_upo(
+ client=_BatchUpoClient(),
+ session_ref="S",
+ upo_ref="U",
+ access_token="acc",
+ poll_interval=0.01,
+ max_attempts=1,
+ )
+ assert batch_exc.value.code == ExitCode.RETRY_EXHAUSTED
+
+
+def test_wait_for_upo_invoice_ref_pending_status_times_out(monkeypatch) -> None:
+ class _Sessions:
+ @staticmethod
+ def get_session_invoice_status(session_ref, invoice_ref, access_token):
+ _ = (session_ref, invoice_ref, access_token)
+ return {"status": {"code": 100}}
+
+ monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref"))
+ monkeypatch.setattr(
+ adapters,
+ "create_client",
+ lambda base_url, access_token=None: _FakeClient(sessions=_Sessions()),
+ )
+ monkeypatch.setattr(adapters.time, "sleep", lambda _: None)
+
+ with pytest.raises(CliError) as exc:
+ adapters.wait_for_upo(
+ profile="demo",
+ base_url="https://example.invalid",
+ session_ref="SES-PENDING",
+ invoice_ref="INV-PENDING",
+ upo_ref=None,
+ batch_auto=False,
+ poll_interval=0.01,
+ max_attempts=1,
+ out=None,
+ overwrite=False,
+ )
+ assert exc.value.code == ExitCode.RETRY_EXHAUSTED
+
+
+def test_send_online_invoice_success_without_waits(monkeypatch, tmp_path) -> None:
+ class _Security:
+ @staticmethod
+ def get_public_key_certificates():
+ return [{"usage": ["SymmetricKeyEncryption"], "certificate": "CERT"}]
+
+ class _Sessions:
+ pass
+
+ class _OnlineWorkflow:
+ def __init__(self, sessions):
+ _ = sessions
+
+ def open_session(self, *, form_code, public_certificate, access_token, upo_v43=False):
+ _ = (form_code, public_certificate, access_token, upo_v43)
+ return SimpleNamespace(
+ session_reference_number="SES-NO-WAIT",
+ encryption_data=SimpleNamespace(key=b"k", iv=b"i"),
+ )
+
+ def send_invoice(self, **kwargs):
+ _ = kwargs
+ return {"referenceNumber": "INV-NO-WAIT"}
+
+ def close_session(self, reference_number, access_token):
+ _ = (reference_number, access_token)
+
+ monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref"))
+ monkeypatch.setattr(adapters, "OnlineSessionWorkflow", _OnlineWorkflow)
+ monkeypatch.setattr(
+ adapters,
+ "create_client",
+ lambda base_url, access_token=None: _FakeClient(sessions=_Sessions(), security=_Security()),
+ )
+
+ invoice_path = tmp_path / "invoice.xml"
+ invoice_path.write_text("", encoding="utf-8")
+ result = adapters.send_online_invoice(
+ profile="demo",
+ base_url="https://example.invalid",
+ invoice=str(invoice_path),
+ system_code="FA (3)",
+ schema_version="1-0E",
+ form_value="FA",
+ upo_v43=False,
+ wait_status=False,
+ wait_upo=False,
+ poll_interval=0.01,
+ max_attempts=1,
+ save_upo=None,
+ )
+ assert result == {"session_ref": "SES-NO-WAIT", "invoice_ref": "INV-NO-WAIT"}
+
+
+def test_send_online_invoice_wait_status_without_wait_upo(monkeypatch, tmp_path) -> None:
+ class _Security:
+ @staticmethod
+ def get_public_key_certificates():
+ return [{"usage": ["SymmetricKeyEncryption"], "certificate": "CERT"}]
+
+ class _Sessions:
+ pass
+
+ class _OnlineWorkflow:
+ def __init__(self, sessions):
+ _ = sessions
+
+ def open_session(self, *, form_code, public_certificate, access_token, upo_v43=False):
+ _ = (form_code, public_certificate, access_token, upo_v43)
+ return SimpleNamespace(
+ session_reference_number="SES-STATUS-ONLY",
+ encryption_data=SimpleNamespace(key=b"k", iv=b"i"),
+ )
+
+ def send_invoice(self, **kwargs):
+ _ = kwargs
+ return {"referenceNumber": "INV-STATUS-ONLY"}
+
+ def close_session(self, reference_number, access_token):
+ _ = (reference_number, access_token)
+
+ monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref"))
+ monkeypatch.setattr(adapters, "OnlineSessionWorkflow", _OnlineWorkflow)
+ monkeypatch.setattr(
+ adapters,
+ "create_client",
+ lambda base_url, access_token=None: _FakeClient(sessions=_Sessions(), security=_Security()),
+ )
+ monkeypatch.setattr(
+ adapters,
+ "_wait_for_invoice_status",
+ lambda **kwargs: {
+ "status": {"code": 200, "description": "Accepted"},
+ "ksefNumber": "KSEF-X",
+ },
+ )
+
+ invoice_path = tmp_path / "invoice.xml"
+ invoice_path.write_text("", encoding="utf-8")
+ result = adapters.send_online_invoice(
+ profile="demo",
+ base_url="https://example.invalid",
+ invoice=str(invoice_path),
+ system_code="FA (3)",
+ schema_version="1-0E",
+ form_value="FA",
+ upo_v43=False,
+ wait_status=True,
+ wait_upo=False,
+ poll_interval=0.01,
+ max_attempts=1,
+ save_upo=None,
+ )
+ assert result["ksef_number"] == "KSEF-X"
+ assert "upo_bytes" not in result
+
+
+def test_send_batch_invoices_wait_status_without_wait_upo(monkeypatch, tmp_path) -> None:
+ class _Security:
+ @staticmethod
+ def get_public_key_certificates():
+ return [{"usage": ["SymmetricKeyEncryption"], "certificate": "CERT"}]
+
+ class _Sessions:
+ @staticmethod
+ def get_session_status(session_ref, access_token):
+ _ = (session_ref, access_token)
+ return {"status": {"code": 200, "description": "Done"}, "upoReferenceNumber": "UPO-1"}
+
+ class _BatchWorkflow:
+ def __init__(self, sessions, http_client):
+ _ = (sessions, http_client)
+
+ @staticmethod
+ def open_upload_and_close(**kwargs):
+ _ = kwargs
+ return "SES-BATCH-STATUS"
+
+ monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref"))
+ monkeypatch.setattr(adapters, "BatchSessionWorkflow", _BatchWorkflow)
+ monkeypatch.setattr(
+ adapters,
+ "create_client",
+ lambda base_url, access_token=None: _FakeClient(
+ sessions=_Sessions(), security=_Security(), http_client=SimpleNamespace()
+ ),
+ )
+ monkeypatch.setattr(adapters.time, "sleep", lambda _: None)
+
+ zip_path = tmp_path / "batch.zip"
+ zip_path.write_bytes(b"PK\x03\x04")
+ result = adapters.send_batch_invoices(
+ profile="demo",
+ base_url="https://example.invalid",
+ zip_path=str(zip_path),
+ directory=None,
+ system_code="FA (3)",
+ schema_version="1-0E",
+ form_value="FA",
+ parallelism=1,
+ upo_v43=False,
+ wait_status=True,
+ wait_upo=False,
+ poll_interval=0.01,
+ max_attempts=1,
+ save_upo=None,
+ )
+ assert result["session_ref"] == "SES-BATCH-STATUS"
+ assert result["upo_ref"] == "UPO-1"
+ assert "upo_path" not in result
+
+
+def test_run_health_check_ignores_non_list_usage_values(monkeypatch) -> None:
+ class _SecurityClient:
+ @staticmethod
+ def get_public_key_certificates():
+ return [
+ {"usage": "KsefTokenEncryption", "certificate": "A"},
+ {"usage": ["SymmetricKeyEncryption"], "certificate": "B"},
+ ]
+
+ monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref"))
+ monkeypatch.setattr(
+ adapters,
+ "create_client",
+ lambda base_url, access_token=None: _FakeClient(security=_SecurityClient()),
+ )
+ result = adapters.run_health_check(
+ profile="demo",
+ base_url="https://example.invalid",
+ dry_run=True,
+ check_auth=False,
+ check_certs=False,
+ )
+ cert_check = [item for item in result["checks"] if item["name"] == "certificates"][0]
+ assert cert_check["status"] == "FAIL"
diff --git a/tests/test_base64url.py b/tests/test_base64url.py
index 86f2ea7..bace5c8 100644
--- a/tests/test_base64url.py
+++ b/tests/test_base64url.py
@@ -16,6 +16,12 @@ def test_standard_base64(self):
decoded = b64decode(encoded)
self.assertEqual(decoded, data)
+ def test_standard_base64_decode_bytes_input(self):
+ data = b"data"
+ encoded = b64encode(data).encode("ascii")
+ decoded = b64decode(encoded)
+ self.assertEqual(decoded, data)
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_clients.py b/tests/test_clients.py
index 54874ac..c48577b 100644
--- a/tests/test_clients.py
+++ b/tests/test_clients.py
@@ -10,11 +10,12 @@
AsyncInvoicesClient,
InvoicesClient,
_normalize_datetime_without_offset,
+ _normalize_invoice_date_range_payload,
)
from ksef_client.clients.lighthouse import AsyncLighthouseClient, LighthouseClient
from ksef_client.clients.limits import AsyncLimitsClient, LimitsClient
from ksef_client.clients.peppol import AsyncPeppolClient, PeppolClient
-from ksef_client.clients.permissions import AsyncPermissionsClient, PermissionsClient
+from ksef_client.clients.permissions import AsyncPermissionsClient, PermissionsClient, _page_params
from ksef_client.clients.rate_limits import AsyncRateLimitsClient, RateLimitsClient
from ksef_client.clients.security import AsyncSecurityClient, SecurityClient
from ksef_client.clients.sessions import AsyncSessionsClient, SessionsClient
@@ -76,6 +77,15 @@ def test_auth_client(self):
client.redeem_token("auth")
client.refresh_access_token("refresh")
+ def test_auth_client_get_active_sessions_without_optional_filters(self):
+ client = AuthClient(self.http)
+ with patch.object(
+ client, "_request_json", Mock(return_value={"ok": True})
+ ) as request_json_mock:
+ client.get_active_sessions(continuation_token="", access_token="token")
+ self.assertIsNone(request_json_mock.call_args.kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+
def test_sessions_client(self):
client = SessionsClient(self.http)
with (
@@ -113,6 +123,46 @@ def test_sessions_client(self):
client.get_session_invoice_upo_by_ksef("ref", "ksef", access_token="token")
client.get_session_upo("ref", "upo", access_token="token")
+ def test_sessions_client_without_optional_filters(self):
+ client = SessionsClient(self.http)
+ with patch.object(
+ client, "_request_json", Mock(return_value={"ok": True})
+ ) as request_json_mock:
+ client.get_sessions(
+ session_type="online",
+ continuation_token="",
+ reference_number="",
+ date_created_from="",
+ date_created_to="",
+ date_closed_from="",
+ date_closed_to="",
+ date_modified_from="",
+ date_modified_to="",
+ statuses=[],
+ access_token="token",
+ )
+ self.assertEqual(
+ request_json_mock.call_args.kwargs["params"], {"sessionType": "online"}
+ )
+ self.assertIsNone(request_json_mock.call_args.kwargs["headers"])
+
+ def test_sessions_client_without_upo_feature_and_pagination(self):
+ client = SessionsClient(self.http)
+ with patch.object(
+ client, "_request_json", Mock(return_value={"ok": True})
+ ) as request_json_mock:
+ client.open_online_session({"a": 1}, access_token="token")
+ client.open_batch_session({"a": 1}, access_token="token")
+ client.get_session_invoices("ref", continuation_token="", access_token="token")
+ client.get_session_failed_invoices("ref", continuation_token="", access_token="token")
+
+ self.assertIsNone(request_json_mock.call_args_list[0].kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args_list[1].kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args_list[2].kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args_list[2].kwargs["params"])
+ self.assertIsNone(request_json_mock.call_args_list[3].kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args_list[3].kwargs["params"])
+
def test_invoices_client(self):
client = InvoicesClient(self.http)
query_payload = {
@@ -175,6 +225,23 @@ def test_invoices_client(self):
"2025-07-02T11:15:00+02:00",
)
+ def test_invoices_client_query_metadata_without_optional_params(self):
+ client = InvoicesClient(self.http)
+ with patch.object(
+ client, "_request_json", Mock(return_value={"ok": True})
+ ) as request_json_mock:
+ client.query_invoice_metadata({"subjectType": "Subject1"}, access_token="token")
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+
+ def test_normalize_invoice_date_range_payload_passthrough_branches(self):
+ payload = {
+ "dateRange": {"from": 1, "to": None},
+ "filters": {"dateRange": "invalid"},
+ }
+ normalized = _normalize_invoice_date_range_payload(payload)
+ self.assertEqual(normalized["dateRange"]["from"], 1)
+ self.assertIsNone(normalized["dateRange"]["to"])
+
def test_normalize_datetime_without_offset_passthrough_branches(self):
self.assertEqual(_normalize_datetime_without_offset("2025-01-02"), "2025-01-02")
@@ -215,6 +282,9 @@ def test_permissions_client(self):
)
client.query_subunits_grants(payload, page_offset=0, page_size=10, access_token="token")
+ def test_permissions_page_params_without_values(self):
+ self.assertEqual(_page_params(None, None), {})
+
def test_certificates_client(self):
client = CertificatesClient(self.http)
with patch.object(client, "_request_json", Mock(return_value={"ok": True})):
@@ -232,6 +302,14 @@ def test_certificates_client(self):
client.retrieve_certificate(payload, access_token="token")
client.revoke_certificate("serial", payload, access_token="token")
+ def test_certificates_client_query_without_pagination(self):
+ client = CertificatesClient(self.http)
+ with patch.object(
+ client, "_request_json", Mock(return_value={"ok": True})
+ ) as request_json_mock:
+ client.query_certificates({"a": 1}, access_token="token")
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+
def test_tokens_client(self):
client = TokensClient(self.http)
with patch.object(client, "_request_json", Mock(return_value={"ok": True})):
@@ -248,6 +326,22 @@ def test_tokens_client(self):
client.get_token_status("ref", access_token="token")
client.revoke_token("ref", access_token="token")
+ def test_tokens_client_list_tokens_without_optional_filters(self):
+ client = TokensClient(self.http)
+ with patch.object(
+ client, "_request_json", Mock(return_value={"ok": True})
+ ) as request_json_mock:
+ client.list_tokens(
+ access_token="token",
+ statuses=[],
+ description="",
+ author_identifier="",
+ author_identifier_type="",
+ continuation_token="",
+ )
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+ self.assertIsNone(request_json_mock.call_args.kwargs["headers"])
+
def test_limits_clients(self):
client = LimitsClient(self.http)
with patch.object(client, "_request_json", Mock(return_value={"ok": True})):
@@ -289,6 +383,14 @@ def test_peppol_client(self):
with patch.object(client, "_request_json", Mock(return_value={"ok": True})):
client.list_providers(page_offset=0, page_size=10)
+ def test_peppol_client_without_pagination(self):
+ client = PeppolClient(self.http)
+ with patch.object(
+ client, "_request_json", Mock(return_value={"ok": True})
+ ) as request_json_mock:
+ client.list_providers()
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+
def test_lighthouse_client(self):
client = LighthouseClient(self.http, "https://api-latarnia-test.ksef.mf.gov.pl")
with patch.object(
@@ -561,6 +663,25 @@ async def test_async_clients(self):
await tokens.get_token_status("ref", access_token="token")
await tokens.revoke_token("ref", access_token="token")
+ async def test_async_tokens_client_list_tokens_without_optional_filters(self):
+ response = HttpResponse(200, httpx.Headers({}), b"{}")
+ http = DummyAsyncHttp(response)
+ payload = {"a": 1}
+ tokens = AsyncTokensClient(http)
+ with patch.object(
+ tokens, "_request_json", AsyncMock(return_value={"ok": True})
+ ) as request_json_mock:
+ await tokens.list_tokens(
+ access_token="token",
+ statuses=[],
+ description="",
+ author_identifier="",
+ author_identifier_type="",
+ continuation_token="",
+ )
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+ self.assertIsNone(request_json_mock.call_args.kwargs["headers"])
+
limits = AsyncLimitsClient(http)
with patch.object(limits, "_request_json", AsyncMock(return_value={"ok": True})):
await limits.get_context_limits("token")
@@ -598,6 +719,93 @@ async def test_async_clients(self):
with patch.object(peppol, "_request_json", AsyncMock(return_value={"ok": True})):
await peppol.list_providers(page_offset=0, page_size=10)
+ async def test_async_peppol_client_without_pagination(self):
+ response = HttpResponse(200, httpx.Headers({}), b"{}")
+ http = DummyAsyncHttp(response)
+ peppol = AsyncPeppolClient(http)
+ with patch.object(
+ peppol, "_request_json", AsyncMock(return_value={"ok": True})
+ ) as request_json_mock:
+ await peppol.list_providers()
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+
+ async def test_async_auth_client_get_active_sessions_without_optional_filters(self):
+ response = HttpResponse(200, httpx.Headers({}), b"{}")
+ http = DummyAsyncHttp(response)
+ auth = AsyncAuthClient(http)
+ with patch.object(
+ auth, "_request_json", AsyncMock(return_value={"ok": True})
+ ) as request_json_mock:
+ await auth.get_active_sessions(continuation_token="", access_token="token")
+ self.assertIsNone(request_json_mock.call_args.kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+
+ async def test_async_certificates_client_query_without_pagination(self):
+ response = HttpResponse(200, httpx.Headers({}), b"{}")
+ http = DummyAsyncHttp(response)
+ certificates = AsyncCertificatesClient(http)
+ with patch.object(
+ certificates, "_request_json", AsyncMock(return_value={"ok": True})
+ ) as request_json_mock:
+ await certificates.query_certificates({"a": 1}, access_token="token")
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+
+ async def test_async_invoices_client_query_metadata_without_optional_params(self):
+ response = HttpResponse(200, httpx.Headers({}), b"{}")
+ http = DummyAsyncHttp(response)
+ invoices = AsyncInvoicesClient(http)
+ with patch.object(
+ invoices, "_request_json", AsyncMock(return_value={"ok": True})
+ ) as request_json_mock:
+ await invoices.query_invoice_metadata({"subjectType": "Subject1"}, access_token="token")
+ self.assertIsNone(request_json_mock.call_args.kwargs["params"])
+
+ async def test_async_sessions_client_without_optional_filters(self):
+ response = HttpResponse(200, httpx.Headers({}), b"{}")
+ http = DummyAsyncHttp(response)
+ sessions = AsyncSessionsClient(http)
+ with patch.object(
+ sessions, "_request_json", AsyncMock(return_value={"ok": True})
+ ) as request_json_mock:
+ await sessions.get_sessions(
+ session_type="online",
+ continuation_token="",
+ reference_number="",
+ date_created_from="",
+ date_created_to="",
+ date_closed_from="",
+ date_closed_to="",
+ date_modified_from="",
+ date_modified_to="",
+ statuses=[],
+ access_token="token",
+ )
+ self.assertEqual(
+ request_json_mock.call_args.kwargs["params"], {"sessionType": "online"}
+ )
+ self.assertIsNone(request_json_mock.call_args.kwargs["headers"])
+
+ async def test_async_sessions_client_without_upo_feature_and_pagination(self):
+ response = HttpResponse(200, httpx.Headers({}), b"{}")
+ http = DummyAsyncHttp(response)
+ sessions = AsyncSessionsClient(http)
+ with patch.object(
+ sessions, "_request_json", AsyncMock(return_value={"ok": True})
+ ) as request_json_mock:
+ await sessions.open_online_session({"a": 1}, access_token="token")
+ await sessions.open_batch_session({"a": 1}, access_token="token")
+ await sessions.get_session_invoices("ref", continuation_token="", access_token="token")
+ await sessions.get_session_failed_invoices(
+ "ref", continuation_token="", access_token="token"
+ )
+
+ self.assertIsNone(request_json_mock.call_args_list[0].kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args_list[1].kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args_list[2].kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args_list[2].kwargs["params"])
+ self.assertIsNone(request_json_mock.call_args_list[3].kwargs["headers"])
+ self.assertIsNone(request_json_mock.call_args_list[3].kwargs["params"])
+
lighthouse = AsyncLighthouseClient(http, "https://api-latarnia-test.ksef.mf.gov.pl")
with patch.object(
lighthouse,
diff --git a/tests/test_crypto.py b/tests/test_crypto.py
index 697c12d..8487828 100644
--- a/tests/test_crypto.py
+++ b/tests/test_crypto.py
@@ -101,6 +101,8 @@ def test_private_key_loading(self):
rsa_cert = generate_rsa_cert()
key = crypto._load_private_key(rsa_cert.private_key_pem)
self.assertEqual(key.key_size, rsa_cert.private_key.key_size)
+ key_from_bytes = crypto._load_private_key(rsa_cert.private_key_pem.encode("ascii"))
+ self.assertEqual(key_from_bytes.key_size, rsa_cert.private_key.key_size)
der = rsa_cert.private_key.private_bytes(
encoding=crypto.serialization.Encoding.DER,
format=crypto.serialization.PrivateFormat.PKCS8,
diff --git a/tests/test_http.py b/tests/test_http.py
index b397b61..7ee5c46 100644
--- a/tests/test_http.py
+++ b/tests/test_http.py
@@ -293,6 +293,30 @@ async def test_async_skip_auth_presigned_validation_rejects_localhost(self):
with self.assertRaisesRegex(ValueError, "localhost"):
await client.request("GET", "https://localhost/upload", skip_auth=True)
+ async def test_async_request_absolute_url_without_skip_auth(self):
+ options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl")
+ client = AsyncBaseHttpClient(options, access_token="token")
+ response = httpx.Response(200, json={"ok": True})
+ with patch.object(
+ client._client, "request", AsyncMock(return_value=response)
+ ) as request_mock:
+ await client.request("GET", "https://files.example.com/upload")
+ _, kwargs = request_mock.call_args
+ self.assertEqual(kwargs["url"], "https://files.example.com/upload")
+ self.assertIn("Authorization", kwargs["headers"])
+
+ async def test_async_skip_auth_presigned_url_accepts_valid_https(self):
+ options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl")
+ client = AsyncBaseHttpClient(options, access_token="token")
+ response = httpx.Response(200, json={"ok": True})
+ with patch.object(
+ client._client, "request", AsyncMock(return_value=response)
+ ) as request_mock:
+ await client.request("GET", "https://files.example.com/upload", skip_auth=True)
+ _, kwargs = request_mock.call_args
+ self.assertEqual(kwargs["url"], "https://files.example.com/upload")
+ self.assertNotIn("Authorization", kwargs["headers"])
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_models.py b/tests/test_models.py
index b9740e2..08c9a11 100644
--- a/tests/test_models.py
+++ b/tests/test_models.py
@@ -127,6 +127,7 @@ def test_lighthouse_enum_fallbacks(self):
status = models.LighthouseStatusResponse.from_dict({"status": "UNKNOWN"})
self.assertEqual(status.status, models.LighthouseKsefStatus.AVAILABLE)
self.assertIsNone(status.messages)
+ self.assertNotIn("messages", status.to_dict())
if __name__ == "__main__":
diff --git a/tests/test_openapi_models.py b/tests/test_openapi_models.py
index 223650f..7e4b104 100644
--- a/tests/test_openapi_models.py
+++ b/tests/test_openapi_models.py
@@ -1,6 +1,7 @@
import json
import unittest
from pathlib import Path
+from unittest.mock import patch
from ksef_client import openapi_models as m
@@ -116,6 +117,19 @@ def test_part_upload_request_headers_keep_non_string_values(self):
self.assertTrue(parsed.headers["X-Enabled"])
self.assertEqual(parsed.to_dict()["headers"], payload["headers"])
+ def test_convert_value_handles_unsubscripted_list_hint(self):
+ self.assertEqual(m._convert_value(list, ["a", "b"]), ["a", "b"])
+
+ def test_convert_value_openapi_model_non_dict_returns_raw_value(self):
+ self.assertEqual(m._convert_value(m.AuthenticationListItem, "raw"), "raw")
+
+ def test_convert_value_union_with_empty_args_falls_through(self):
+ with (
+ patch("ksef_client.openapi_models.get_origin", return_value=object()),
+ patch("ksef_client.openapi_models.get_args", return_value=()),
+ ):
+ self.assertEqual(m._convert_value("ignored", "value"), "value")
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_services_csr.py b/tests/test_services_csr.py
index e2fe780..e5687e5 100644
--- a/tests/test_services_csr.py
+++ b/tests/test_services_csr.py
@@ -42,6 +42,10 @@ def test_generate_csr_ec(self):
parsed = x509.load_der_x509_csr(csr_bytes)
self.assertEqual(parsed.subject.rfc4514_string(), "CN=Test")
+ def test_build_subject_without_common_name(self):
+ subject = _build_subject({"organizationName": "KSeF"})
+ self.assertEqual(subject.rfc4514_string(), "O=KSeF")
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_services_person_token.py b/tests/test_services_person_token.py
index 0e675b9..66b78cc 100644
--- a/tests/test_services_person_token.py
+++ b/tests/test_services_person_token.py
@@ -63,6 +63,7 @@ def test_parse_json_string_array(self):
self.assertEqual(_parse_json_string_array(json.dumps(["a", "b"])), ["a", "b"])
self.assertEqual(_parse_json_string_array("a,b"), ["a", "b"])
self.assertEqual(_parse_json_string_array("a"), ["a"])
+ self.assertEqual(_parse_json_string_array(json.dumps({"a": 1})), ['{"a": 1}'])
self.assertEqual(_parse_json_string_array(None), [])
def test_unwrap_if_quoted_json(self):
diff --git a/tests/test_services_workflows.py b/tests/test_services_workflows.py
index 88d7855..4af4c23 100644
--- a/tests/test_services_workflows.py
+++ b/tests/test_services_workflows.py
@@ -360,6 +360,26 @@ def test_batch_session_workflow(self):
)
self.assertEqual(ref, "ref")
+ def test_batch_session_workflow_without_offline_mode_flag(self):
+ sessions = StubSessionsClient()
+ http = RecordingHttp()
+ workflow = workflows.BatchSessionWorkflow(sessions, http)
+ rsa_cert = generate_rsa_cert()
+ zip_bytes = build_zip({"a.xml": b""})
+ workflow.open_upload_and_close(
+ form_code={"systemCode": "FA"},
+ zip_bytes=zip_bytes,
+ public_certificate=rsa_cert.certificate_pem,
+ access_token="token",
+ offline_mode=None,
+ upo_v43=False,
+ parallelism=1,
+ )
+ open_batch_calls = [call for call in sessions.calls if call[0] == "open_batch"]
+ assert open_batch_calls
+ payload = open_batch_calls[0][1]
+ self.assertNotIn("offlineMode", payload)
+
def test_export_workflow(self):
key = generate_symmetric_key()
iv = generate_iv()
@@ -384,6 +404,30 @@ class DummyInvoices:
self.assertEqual(result.metadata_summaries[0]["ksefNumber"], "1")
self.assertIn("inv.xml", result.invoice_xml_files)
+ def test_export_workflow_ignores_non_xml_non_metadata_files(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ files = {
+ "_metadata.json": json.dumps({"invoices": [{"ksefNumber": "1"}]}).encode("utf-8"),
+ "inv.xml": b"",
+ "notes.txt": b"ignored",
+ }
+ archive = build_zip(files)
+ encrypted = encrypt_aes_cbc_pkcs7(archive, key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ workflow = workflows.ExportWorkflow(cast(InvoicesClient, DummyInvoices()), RecordingHttp())
+ with patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ return_value=[(encrypted, _sha256_b64(encrypted))],
+ ):
+ result = workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption)
+ self.assertNotIn("notes.txt", result.invoice_xml_files)
+
def test_export_workflow_rejects_missing_hash_by_default(self):
key = generate_symmetric_key()
iv = generate_iv()
@@ -647,6 +691,24 @@ async def test_async_online_and_batch(self):
)
self.assertEqual(ref, "ref")
+ async def test_async_batch_session_workflow_without_offline_mode_flag(self):
+ sessions = StubAsyncSessionsClient()
+ batch = workflows.AsyncBatchSessionWorkflow(sessions, RecordingAsyncHttp())
+ rsa_cert = generate_rsa_cert()
+ zip_bytes = build_zip({"a.xml": b""})
+ await batch.open_upload_and_close(
+ form_code={"systemCode": "FA"},
+ zip_bytes=zip_bytes,
+ public_certificate=rsa_cert.certificate_pem,
+ access_token="token",
+ offline_mode=None,
+ upo_v43=False,
+ )
+ open_batch_calls = [call for call in sessions.calls if call[0] == "open_batch"]
+ assert open_batch_calls
+ payload = open_batch_calls[0][1]
+ self.assertNotIn("offlineMode", payload)
+
async def test_async_export_workflow(self):
key = generate_symmetric_key()
iv = generate_iv()
@@ -675,6 +737,35 @@ class DummyInvoices:
)
self.assertEqual(result.metadata_summaries[0]["ksefNumber"], "1")
+ async def test_async_export_workflow_ignores_non_xml_non_metadata_files(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ files = {
+ "_metadata.json": json.dumps({"invoiceList": [{"ksefNumber": "1"}]}).encode("utf-8"),
+ "inv.xml": b"",
+ "notes.txt": b"ignored",
+ }
+ archive = build_zip(files)
+ encrypted = encrypt_aes_cbc_pkcs7(archive, key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ workflow = workflows.AsyncExportWorkflow(
+ cast(AsyncInvoicesClient, DummyInvoices()),
+ RecordingAsyncHttp(),
+ )
+ with patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ AsyncMock(return_value=[(encrypted, _sha256_b64(encrypted))]),
+ ):
+ result = await workflow.download_and_process_package(
+ {"parts": [{"url": "u"}]}, encryption
+ )
+ self.assertNotIn("notes.txt", result.invoice_xml_files)
+
async def test_async_export_workflow_rejects_missing_hash_by_default(self):
key = generate_symmetric_key()
iv = generate_iv()
diff --git a/tests/test_zip_utils.py b/tests/test_zip_utils.py
index 6a69d97..bd81fb8 100644
--- a/tests/test_zip_utils.py
+++ b/tests/test_zip_utils.py
@@ -37,6 +37,14 @@ def test_unzip_limits_max_compression_ratio(self):
with self.assertRaises(ValueError):
unzip_bytes(zip_bytes, max_compression_ratio=ratio - 0.001)
+ def test_unzip_safe_allows_disabling_compression_ratio_check(self):
+ payload = b"a" * (256 * 1024)
+ buffer = BytesIO()
+ with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf:
+ zf.writestr("a.txt", payload)
+ unzipped = unzip_bytes_safe(buffer.getvalue(), max_compression_ratio=None)
+ self.assertEqual(unzipped["a.txt"], payload)
+
def test_unzip_safe_rejects_invalid_limits(self):
zip_bytes = build_zip({"a.txt": b"hello"})
with self.assertRaises(ValueError):
@@ -76,6 +84,11 @@ def infolist_with_bad_metadata(self):
), self.assertRaises(ValueError):
unzip_bytes_safe(zip_bytes)
+ def test_unzip_safe_accepts_empty_file_entry(self):
+ zip_bytes = build_zip({"empty.txt": b""})
+ unzipped = unzip_bytes_safe(zip_bytes)
+ self.assertEqual(unzipped["empty.txt"], b"")
+
def test_unzip_safe_rejects_absolute_entry_path(self):
buffer = BytesIO()
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf: