From 28a9db2b65dd7ca24faa592addeec937b9610c2b Mon Sep 17 00:00:00 2001 From: smkc Date: Sat, 28 Feb 2026 09:32:29 +0100 Subject: [PATCH 1/2] test: raise project coverage to 100 percent --- tests/cli/integration/test_init_command.py | 50 +++ tests/cli/unit/test_auth_manager.py | 96 +++++ tests/cli/unit/test_config_loader_profiles.py | 36 ++ tests/cli/unit/test_core_coverage.py | 12 + tests/cli/unit/test_output_human.py | 8 + tests/cli/unit/test_sdk_adapters.py | 386 ++++++++++++++++++ tests/test_base64url.py | 6 + tests/test_clients.py | 178 +++++++- tests/test_crypto.py | 2 + tests/test_http.py | 24 ++ tests/test_models.py | 1 + tests/test_openapi_models.py | 15 + tests/test_services_csr.py | 4 + tests/test_services_person_token.py | 1 + tests/test_services_workflows.py | 91 +++++ tests/test_zip_utils.py | 13 + 16 files changed, 922 insertions(+), 1 deletion(-) diff --git a/tests/cli/integration/test_init_command.py b/tests/cli/integration/test_init_command.py index 22d7225..10b4bdd 100644 --- a/tests/cli/integration/test_init_command.py +++ b/tests/cli/integration/test_init_command.py @@ -76,3 +76,53 @@ def test_init_non_interactive_requires_context_fields(runner, monkeypatch, tmp_p ], ) assert result.exit_code == 2 + + +def test_init_interactive_respects_provided_options_and_keeps_active_profile( + runner, monkeypatch, tmp_path +) -> None: + config_path = tmp_path / "config.json" + monkeypatch.setattr(paths, "config_file", lambda: config_path) + + seed_result = runner.invoke( + app, + [ + "--json", + "init", + "--name", + "seed", + "--env", + "DEMO", + "--context-type", + "nip", + "--context-value", + "111", + "--non-interactive", + "--set-active", + ], + ) + assert seed_result.exit_code == 0 + + result = runner.invoke( + app, + [ + "--json", + "init", + "--name", + "second", + "--env", + "TEST", + "--base-url", + "https://example.test", + "--context-type", + "nip", + "--context-value", + "222", + ], + input="second\n", + ) + assert result.exit_code == 0 + payload = _json_output(result.stdout) + assert payload["ok"] is True + assert payload["profile"] == "second" + assert payload["data"]["active_profile"] == "seed" diff --git a/tests/cli/unit/test_auth_manager.py b/tests/cli/unit/test_auth_manager.py index 354f07e..d7e23f9 100644 --- a/tests/cli/unit/test_auth_manager.py +++ b/tests/cli/unit/test_auth_manager.py @@ -180,6 +180,15 @@ def test_status_and_logout(monkeypatch) -> None: def test_select_certificate_and_require_non_empty_errors() -> None: + cert = manager._select_certificate( + [ + {"usage": ["KsefTokenEncryption"], "certificate": ""}, + {"usage": ["KsefTokenEncryption"], "certificate": "CERT"}, + ], + "KsefTokenEncryption", + ) + assert cert == "CERT" + with pytest.raises(CliError) as cert_error: manager._select_certificate([], "KsefTokenEncryption") assert cert_error.value.code == ExitCode.API_ERROR @@ -214,6 +223,26 @@ def test_resolve_base_url_uses_profile_when_missing(monkeypatch) -> None: assert manager.resolve_base_url(None, profile="demo") == "https://profile.example" +def test_resolve_base_url_falls_back_when_profile_base_url_empty(monkeypatch) -> None: + monkeypatch.setattr( + manager, + "load_config", + lambda: CliConfig( + active_profile="demo", + profiles={ + "demo": ProfileConfig( + name="demo", + env="DEMO", + base_url=" ", + context_type="nip", + context_value="123", + ) + }, + ), + ) + assert manager.resolve_base_url(None, profile="demo") == manager.KsefEnvironment.DEMO.value + + def test_resolve_lighthouse_base_url_prefers_explicit_value() -> None: assert manager.resolve_lighthouse_base_url(" https://api-latarnia-test.ksef.mf.gov.pl/ ") == ( "https://api-latarnia-test.ksef.mf.gov.pl/" @@ -269,6 +298,28 @@ def test_resolve_lighthouse_base_url_invalid_profile_base_fallback(monkeypatch) ) +def test_resolve_lighthouse_base_url_fallback_when_profile_base_url_empty(monkeypatch) -> None: + monkeypatch.setattr( + manager, + "load_config", + lambda: CliConfig( + active_profile="demo", + profiles={ + "demo": ProfileConfig( + name="demo", + env="DEMO", + base_url=" ", + context_type="nip", + context_value="123", + ) + }, + ), + ) + assert manager.resolve_lighthouse_base_url(None, profile="demo") == ( + KsefLighthouseEnvironment.TEST.value + ) + + def test_refresh_access_token_missing_token_in_response(monkeypatch) -> None: class _FakeClientNoToken: def __init__(self) -> None: @@ -492,3 +543,48 @@ def test_login_with_xades_loader_errors_are_mapped(monkeypatch) -> None: save=False, ) assert exc.value.code == ExitCode.VALIDATION_ERROR + + +def test_login_with_token_without_save(monkeypatch) -> None: + monkeypatch.setattr(manager, "create_client", lambda base_url: _FakeClient()) + monkeypatch.setattr(manager, "AuthCoordinator", _FakeAuthCoordinator) + monkeypatch.setattr(manager, "save_tokens", lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("save_tokens should not be called"))) + monkeypatch.setattr( + manager, + "set_cached_metadata", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("set_cached_metadata should not be called")), + ) + + result = manager.login_with_token( + profile="demo", + base_url="https://api-demo.ksef.mf.gov.pl", + token="TOKEN", + context_type="nip", + context_value="5265877635", + poll_interval=0.0, + max_attempts=1, + save=False, + ) + assert result["saved"] is False + + +def test_refresh_access_token_success_without_save(monkeypatch) -> None: + monkeypatch.setattr(manager, "get_tokens", lambda profile: ("acc", "ref")) + monkeypatch.setattr(manager, "create_client", lambda base_url: _FakeClient()) + monkeypatch.setattr( + manager, + "save_tokens", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("save_tokens should not be called")), + ) + monkeypatch.setattr( + manager, + "set_cached_metadata", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("set_cached_metadata should not be called")), + ) + + result = manager.refresh_access_token( + profile="demo", + base_url="https://api-demo.ksef.mf.gov.pl", + save=False, + ) + assert result["saved"] is False diff --git a/tests/cli/unit/test_config_loader_profiles.py b/tests/cli/unit/test_config_loader_profiles.py index 9d8dd2e..d6549c4 100644 --- a/tests/cli/unit/test_config_loader_profiles.py +++ b/tests/cli/unit/test_config_loader_profiles.py @@ -82,6 +82,18 @@ def test_loader_profile_parser_skips_invalid_entries(monkeypatch, tmp_path) -> N assert loader.load_config().profiles == {} +def test_loader_accepts_non_dict_profiles_key(monkeypatch, tmp_path) -> None: + config_path = tmp_path / "config.json" + monkeypatch.setattr(paths, "config_file", lambda: config_path) + config_path.write_text( + json.dumps({"active_profile": "demo", "profiles": []}), + encoding="utf-8", + ) + loaded = loader.load_config() + assert loaded.profiles == {} + assert loaded.active_profile is None + + def test_loader_skips_non_string_profile_names(monkeypatch, tmp_path) -> None: config_path = tmp_path / "config.json" monkeypatch.setattr(paths, "config_file", lambda: config_path) @@ -293,3 +305,27 @@ def test_profiles_upsert_and_active_fallback() -> None: profiles.delete_profile(config, name="one") assert config.active_profile == "two" + + +def test_profiles_delete_non_active_profile_keeps_active() -> None: + config = CliConfig( + active_profile="one", + profiles={ + "one": ProfileConfig( + name="one", + env="DEMO", + base_url="https://api-demo.ksef.mf.gov.pl", + context_type="nip", + context_value="1", + ), + "two": ProfileConfig( + name="two", + env="TEST", + base_url="https://api-test.ksef.mf.gov.pl", + context_type="nip", + context_value="2", + ), + }, + ) + profiles.delete_profile(config, name="two") + assert config.active_profile == "one" diff --git a/tests/cli/unit/test_core_coverage.py b/tests/cli/unit/test_core_coverage.py index 871e867..8d00ec6 100644 --- a/tests/cli/unit/test_core_coverage.py +++ b/tests/cli/unit/test_core_coverage.py @@ -44,6 +44,18 @@ def _fake_entrypoint() -> None: assert called["value"] is True +def test_cli_main_module_import_does_not_invoke_entrypoint(monkeypatch) -> None: + called = {"value": False} + + def _fake_entrypoint() -> None: + called["value"] = True + + monkeypatch.setattr(app_module, "app_entrypoint", _fake_entrypoint) + main_module = importlib.import_module("ksef_client.cli.__main__") + importlib.reload(main_module) + assert called["value"] is False + + def test_config_loader_and_profiles(monkeypatch, tmp_path: Path) -> None: monkeypatch.setattr(paths, "config_file", lambda: tmp_path / "config.json") cfg = loader.load_config() diff --git a/tests/cli/unit/test_output_human.py b/tests/cli/unit/test_output_human.py index cab509d..175acf6 100644 --- a/tests/cli/unit/test_output_human.py +++ b/tests/cli/unit/test_output_human.py @@ -61,6 +61,14 @@ def test_human_renderer_success_skips_raw_response_payload(capsys) -> None: assert "raw" not in out +def test_human_renderer_success_without_data(capsys) -> None: + renderer = HumanRenderer(no_color=True) + renderer.success(command="send.status", profile="demo", data=None) + out = capsys.readouterr().out + assert "OK" in out + assert "send.status" in out + + def test_human_renderer_error_prints_hint(capsys) -> None: renderer = HumanRenderer(no_color=True) renderer.error( diff --git a/tests/cli/unit/test_sdk_adapters.py b/tests/cli/unit/test_sdk_adapters.py index 7bcc4d9..809d0a3 100644 --- a/tests/cli/unit/test_sdk_adapters.py +++ b/tests/cli/unit/test_sdk_adapters.py @@ -2187,3 +2187,389 @@ def get_public_key_certificates(self): check_certs=False, ) assert fail["overall"] == "FAIL" + + +def test_build_zip_from_directory_skips_non_file_xml_paths(tmp_path) -> None: + (tmp_path / "nested").mkdir() + (tmp_path / "nested" / "invoice.xml").mkdir() + with pytest.raises(CliError) as exc: + adapters._build_zip_from_directory(str(tmp_path)) + assert exc.value.code == ExitCode.VALIDATION_ERROR + + +def test_wait_helpers_include_details_in_error_hints(monkeypatch) -> None: + monkeypatch.setattr(adapters.time, "sleep", lambda _: None) + + class _InvoiceStatusClient: + class sessions: + @staticmethod + def get_session_invoice_status(session_ref, invoice_ref, access_token): + _ = (session_ref, invoice_ref, access_token) + return {"status": {"code": 400, "description": "bad", "details": ["d1"]}} + + class _SessionStatusClient: + class sessions: + @staticmethod + def get_session_status(session_ref, access_token): + _ = (session_ref, access_token) + return {"status": {"code": 400, "description": "bad", "details": ["d2"]}} + + class _ExportStatusClient: + class invoices: + @staticmethod + def get_export_status(reference_number, access_token): + _ = (reference_number, access_token) + return {"status": {"code": 400, "description": "bad", "details": ["d3"]}} + + with pytest.raises(CliError) as invoice_exc: + adapters._wait_for_invoice_status( + client=_InvoiceStatusClient(), + session_ref="S", + invoice_ref="I", + access_token="acc", + poll_interval=0.01, + max_attempts=1, + ) + assert "Details: d1" in (invoice_exc.value.hint or "") + + with pytest.raises(CliError) as session_exc: + adapters._wait_for_session_status( + client=_SessionStatusClient(), + session_ref="S", + access_token="acc", + poll_interval=0.01, + max_attempts=1, + ) + assert "Details: d2" in (session_exc.value.hint or "") + + with pytest.raises(CliError) as export_exc: + adapters._wait_for_export_status( + client=_ExportStatusClient(), + reference_number="R", + access_token="acc", + poll_interval=0.01, + max_attempts=1, + ) + assert "Details: d3" in (export_exc.value.hint or "") + + +def test_wait_helpers_error_hints_without_details(monkeypatch) -> None: + monkeypatch.setattr(adapters.time, "sleep", lambda _: None) + + class _InvoiceStatusClient: + class sessions: + @staticmethod + def get_session_invoice_status(session_ref, invoice_ref, access_token): + _ = (session_ref, invoice_ref, access_token) + return {"status": {"code": 400, "description": "bad"}} + + class _SessionStatusClient: + class sessions: + @staticmethod + def get_session_status(session_ref, access_token): + _ = (session_ref, access_token) + return {"status": {"code": 400, "description": "bad"}} + + class _ExportStatusClient: + class invoices: + @staticmethod + def get_export_status(reference_number, access_token): + _ = (reference_number, access_token) + return {"status": {"code": 400, "description": "bad"}} + + with pytest.raises(CliError) as invoice_exc: + adapters._wait_for_invoice_status( + client=_InvoiceStatusClient(), + session_ref="S", + invoice_ref="I", + access_token="acc", + poll_interval=0.01, + max_attempts=1, + ) + assert invoice_exc.value.hint == "bad" + + with pytest.raises(CliError) as session_exc: + adapters._wait_for_session_status( + client=_SessionStatusClient(), + session_ref="S", + access_token="acc", + poll_interval=0.01, + max_attempts=1, + ) + assert session_exc.value.hint == "bad" + + with pytest.raises(CliError) as export_exc: + adapters._wait_for_export_status( + client=_ExportStatusClient(), + reference_number="R", + access_token="acc", + poll_interval=0.01, + max_attempts=1, + ) + assert export_exc.value.hint == "bad" + + +def test_wait_helpers_retry_on_empty_upo_bytes(monkeypatch) -> None: + monkeypatch.setattr(adapters.time, "sleep", lambda _: None) + + class _InvoiceUpoClient: + class sessions: + @staticmethod + def get_session_invoice_upo_by_ref(session_ref, invoice_ref, access_token): + _ = (session_ref, invoice_ref, access_token) + return b"" + + class _BatchUpoClient: + class sessions: + @staticmethod + def get_session_upo(session_ref, upo_ref, access_token): + _ = (session_ref, upo_ref, access_token) + return b"" + + with pytest.raises(CliError) as invoice_exc: + adapters._wait_for_invoice_upo( + client=_InvoiceUpoClient(), + session_ref="S", + invoice_ref="I", + access_token="acc", + poll_interval=0.01, + max_attempts=1, + ) + assert invoice_exc.value.code == ExitCode.RETRY_EXHAUSTED + + with pytest.raises(CliError) as batch_exc: + adapters._wait_for_batch_upo( + client=_BatchUpoClient(), + session_ref="S", + upo_ref="U", + access_token="acc", + poll_interval=0.01, + max_attempts=1, + ) + assert batch_exc.value.code == ExitCode.RETRY_EXHAUSTED + + +def test_wait_for_upo_invoice_ref_pending_status_times_out(monkeypatch) -> None: + class _Sessions: + @staticmethod + def get_session_invoice_status(session_ref, invoice_ref, access_token): + _ = (session_ref, invoice_ref, access_token) + return {"status": {"code": 100}} + + monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref")) + monkeypatch.setattr( + adapters, + "create_client", + lambda base_url, access_token=None: _FakeClient(sessions=_Sessions()), + ) + monkeypatch.setattr(adapters.time, "sleep", lambda _: None) + + with pytest.raises(CliError) as exc: + adapters.wait_for_upo( + profile="demo", + base_url="https://example.invalid", + session_ref="SES-PENDING", + invoice_ref="INV-PENDING", + upo_ref=None, + batch_auto=False, + poll_interval=0.01, + max_attempts=1, + out=None, + overwrite=False, + ) + assert exc.value.code == ExitCode.RETRY_EXHAUSTED + + +def test_send_online_invoice_success_without_waits(monkeypatch, tmp_path) -> None: + class _Security: + @staticmethod + def get_public_key_certificates(): + return [{"usage": ["SymmetricKeyEncryption"], "certificate": "CERT"}] + + class _Sessions: + pass + + class _OnlineWorkflow: + def __init__(self, sessions): + _ = sessions + + def open_session(self, *, form_code, public_certificate, access_token, upo_v43=False): + _ = (form_code, public_certificate, access_token, upo_v43) + return SimpleNamespace( + session_reference_number="SES-NO-WAIT", + encryption_data=SimpleNamespace(key=b"k", iv=b"i"), + ) + + def send_invoice(self, **kwargs): + _ = kwargs + return {"referenceNumber": "INV-NO-WAIT"} + + def close_session(self, reference_number, access_token): + _ = (reference_number, access_token) + + monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref")) + monkeypatch.setattr(adapters, "OnlineSessionWorkflow", _OnlineWorkflow) + monkeypatch.setattr( + adapters, + "create_client", + lambda base_url, access_token=None: _FakeClient(sessions=_Sessions(), security=_Security()), + ) + + invoice_path = tmp_path / "invoice.xml" + invoice_path.write_text("", encoding="utf-8") + result = adapters.send_online_invoice( + profile="demo", + base_url="https://example.invalid", + invoice=str(invoice_path), + system_code="FA (3)", + schema_version="1-0E", + form_value="FA", + upo_v43=False, + wait_status=False, + wait_upo=False, + poll_interval=0.01, + max_attempts=1, + save_upo=None, + ) + assert result == {"session_ref": "SES-NO-WAIT", "invoice_ref": "INV-NO-WAIT"} + + +def test_send_online_invoice_wait_status_without_wait_upo(monkeypatch, tmp_path) -> None: + class _Security: + @staticmethod + def get_public_key_certificates(): + return [{"usage": ["SymmetricKeyEncryption"], "certificate": "CERT"}] + + class _Sessions: + pass + + class _OnlineWorkflow: + def __init__(self, sessions): + _ = sessions + + def open_session(self, *, form_code, public_certificate, access_token, upo_v43=False): + _ = (form_code, public_certificate, access_token, upo_v43) + return SimpleNamespace( + session_reference_number="SES-STATUS-ONLY", + encryption_data=SimpleNamespace(key=b"k", iv=b"i"), + ) + + def send_invoice(self, **kwargs): + _ = kwargs + return {"referenceNumber": "INV-STATUS-ONLY"} + + def close_session(self, reference_number, access_token): + _ = (reference_number, access_token) + + monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref")) + monkeypatch.setattr(adapters, "OnlineSessionWorkflow", _OnlineWorkflow) + monkeypatch.setattr( + adapters, + "create_client", + lambda base_url, access_token=None: _FakeClient(sessions=_Sessions(), security=_Security()), + ) + monkeypatch.setattr( + adapters, + "_wait_for_invoice_status", + lambda **kwargs: {"status": {"code": 200, "description": "Accepted"}, "ksefNumber": "KSEF-X"}, + ) + + invoice_path = tmp_path / "invoice.xml" + invoice_path.write_text("", encoding="utf-8") + result = adapters.send_online_invoice( + profile="demo", + base_url="https://example.invalid", + invoice=str(invoice_path), + system_code="FA (3)", + schema_version="1-0E", + form_value="FA", + upo_v43=False, + wait_status=True, + wait_upo=False, + poll_interval=0.01, + max_attempts=1, + save_upo=None, + ) + assert result["ksef_number"] == "KSEF-X" + assert "upo_bytes" not in result + + +def test_send_batch_invoices_wait_status_without_wait_upo(monkeypatch, tmp_path) -> None: + class _Security: + @staticmethod + def get_public_key_certificates(): + return [{"usage": ["SymmetricKeyEncryption"], "certificate": "CERT"}] + + class _Sessions: + @staticmethod + def get_session_status(session_ref, access_token): + _ = (session_ref, access_token) + return {"status": {"code": 200, "description": "Done"}, "upoReferenceNumber": "UPO-1"} + + class _BatchWorkflow: + def __init__(self, sessions, http_client): + _ = (sessions, http_client) + + @staticmethod + def open_upload_and_close(**kwargs): + _ = kwargs + return "SES-BATCH-STATUS" + + monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref")) + monkeypatch.setattr(adapters, "BatchSessionWorkflow", _BatchWorkflow) + monkeypatch.setattr( + adapters, + "create_client", + lambda base_url, access_token=None: _FakeClient( + sessions=_Sessions(), security=_Security(), http_client=SimpleNamespace() + ), + ) + monkeypatch.setattr(adapters.time, "sleep", lambda _: None) + + zip_path = tmp_path / "batch.zip" + zip_path.write_bytes(b"PK\x03\x04") + result = adapters.send_batch_invoices( + profile="demo", + base_url="https://example.invalid", + zip_path=str(zip_path), + directory=None, + system_code="FA (3)", + schema_version="1-0E", + form_value="FA", + parallelism=1, + upo_v43=False, + wait_status=True, + wait_upo=False, + poll_interval=0.01, + max_attempts=1, + save_upo=None, + ) + assert result["session_ref"] == "SES-BATCH-STATUS" + assert result["upo_ref"] == "UPO-1" + assert "upo_path" not in result + + +def test_run_health_check_ignores_non_list_usage_values(monkeypatch) -> None: + class _SecurityClient: + @staticmethod + def get_public_key_certificates(): + return [ + {"usage": "KsefTokenEncryption", "certificate": "A"}, + {"usage": ["SymmetricKeyEncryption"], "certificate": "B"}, + ] + + monkeypatch.setattr(adapters, "get_tokens", lambda profile: ("acc", "ref")) + monkeypatch.setattr( + adapters, + "create_client", + lambda base_url, access_token=None: _FakeClient(security=_SecurityClient()), + ) + result = adapters.run_health_check( + profile="demo", + base_url="https://example.invalid", + dry_run=True, + check_auth=False, + check_certs=False, + ) + cert_check = [item for item in result["checks"] if item["name"] == "certificates"][0] + assert cert_check["status"] == "FAIL" diff --git a/tests/test_base64url.py b/tests/test_base64url.py index 86f2ea7..bace5c8 100644 --- a/tests/test_base64url.py +++ b/tests/test_base64url.py @@ -16,6 +16,12 @@ def test_standard_base64(self): decoded = b64decode(encoded) self.assertEqual(decoded, data) + def test_standard_base64_decode_bytes_input(self): + data = b"data" + encoded = b64encode(data).encode("ascii") + decoded = b64decode(encoded) + self.assertEqual(decoded, data) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_clients.py b/tests/test_clients.py index 54874ac..adb1486 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -10,11 +10,12 @@ AsyncInvoicesClient, InvoicesClient, _normalize_datetime_without_offset, + _normalize_invoice_date_range_payload, ) from ksef_client.clients.lighthouse import AsyncLighthouseClient, LighthouseClient from ksef_client.clients.limits import AsyncLimitsClient, LimitsClient from ksef_client.clients.peppol import AsyncPeppolClient, PeppolClient -from ksef_client.clients.permissions import AsyncPermissionsClient, PermissionsClient +from ksef_client.clients.permissions import AsyncPermissionsClient, PermissionsClient, _page_params from ksef_client.clients.rate_limits import AsyncRateLimitsClient, RateLimitsClient from ksef_client.clients.security import AsyncSecurityClient, SecurityClient from ksef_client.clients.sessions import AsyncSessionsClient, SessionsClient @@ -76,6 +77,13 @@ def test_auth_client(self): client.redeem_token("auth") client.refresh_access_token("refresh") + def test_auth_client_get_active_sessions_without_optional_filters(self): + client = AuthClient(self.http) + with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + client.get_active_sessions(continuation_token="", access_token="token") + self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + def test_sessions_client(self): client = SessionsClient(self.http) with ( @@ -113,6 +121,40 @@ def test_sessions_client(self): client.get_session_invoice_upo_by_ksef("ref", "ksef", access_token="token") client.get_session_upo("ref", "upo", access_token="token") + def test_sessions_client_without_optional_filters(self): + client = SessionsClient(self.http) + with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + client.get_sessions( + session_type="online", + continuation_token="", + reference_number="", + date_created_from="", + date_created_to="", + date_closed_from="", + date_closed_to="", + date_modified_from="", + date_modified_to="", + statuses=[], + access_token="token", + ) + self.assertEqual(request_json_mock.call_args.kwargs["params"], {"sessionType": "online"}) + self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) + + def test_sessions_client_without_upo_feature_and_pagination(self): + client = SessionsClient(self.http) + with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + client.open_online_session({"a": 1}, access_token="token") + client.open_batch_session({"a": 1}, access_token="token") + client.get_session_invoices("ref", continuation_token="", access_token="token") + client.get_session_failed_invoices("ref", continuation_token="", access_token="token") + + self.assertIsNone(request_json_mock.call_args_list[0].kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args_list[1].kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args_list[2].kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args_list[2].kwargs["params"]) + self.assertIsNone(request_json_mock.call_args_list[3].kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args_list[3].kwargs["params"]) + def test_invoices_client(self): client = InvoicesClient(self.http) query_payload = { @@ -175,6 +217,21 @@ def test_invoices_client(self): "2025-07-02T11:15:00+02:00", ) + def test_invoices_client_query_metadata_without_optional_params(self): + client = InvoicesClient(self.http) + with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + client.query_invoice_metadata({"subjectType": "Subject1"}, access_token="token") + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + + def test_normalize_invoice_date_range_payload_passthrough_branches(self): + payload = { + "dateRange": {"from": 1, "to": None}, + "filters": {"dateRange": "invalid"}, + } + normalized = _normalize_invoice_date_range_payload(payload) + self.assertEqual(normalized["dateRange"]["from"], 1) + self.assertIsNone(normalized["dateRange"]["to"]) + def test_normalize_datetime_without_offset_passthrough_branches(self): self.assertEqual(_normalize_datetime_without_offset("2025-01-02"), "2025-01-02") @@ -215,6 +272,9 @@ def test_permissions_client(self): ) client.query_subunits_grants(payload, page_offset=0, page_size=10, access_token="token") + def test_permissions_page_params_without_values(self): + self.assertEqual(_page_params(None, None), {}) + def test_certificates_client(self): client = CertificatesClient(self.http) with patch.object(client, "_request_json", Mock(return_value={"ok": True})): @@ -232,6 +292,12 @@ def test_certificates_client(self): client.retrieve_certificate(payload, access_token="token") client.revoke_certificate("serial", payload, access_token="token") + def test_certificates_client_query_without_pagination(self): + client = CertificatesClient(self.http) + with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + client.query_certificates({"a": 1}, access_token="token") + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + def test_tokens_client(self): client = TokensClient(self.http) with patch.object(client, "_request_json", Mock(return_value={"ok": True})): @@ -248,6 +314,20 @@ def test_tokens_client(self): client.get_token_status("ref", access_token="token") client.revoke_token("ref", access_token="token") + def test_tokens_client_list_tokens_without_optional_filters(self): + client = TokensClient(self.http) + with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + client.list_tokens( + access_token="token", + statuses=[], + description="", + author_identifier="", + author_identifier_type="", + continuation_token="", + ) + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) + def test_limits_clients(self): client = LimitsClient(self.http) with patch.object(client, "_request_json", Mock(return_value={"ok": True})): @@ -289,6 +369,12 @@ def test_peppol_client(self): with patch.object(client, "_request_json", Mock(return_value={"ok": True})): client.list_providers(page_offset=0, page_size=10) + def test_peppol_client_without_pagination(self): + client = PeppolClient(self.http) + with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + client.list_providers() + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + def test_lighthouse_client(self): client = LighthouseClient(self.http, "https://api-latarnia-test.ksef.mf.gov.pl") with patch.object( @@ -561,6 +647,23 @@ async def test_async_clients(self): await tokens.get_token_status("ref", access_token="token") await tokens.revoke_token("ref", access_token="token") + async def test_async_tokens_client_list_tokens_without_optional_filters(self): + response = HttpResponse(200, httpx.Headers({}), b"{}") + http = DummyAsyncHttp(response) + payload = {"a": 1} + tokens = AsyncTokensClient(http) + with patch.object(tokens, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + await tokens.list_tokens( + access_token="token", + statuses=[], + description="", + author_identifier="", + author_identifier_type="", + continuation_token="", + ) + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) + limits = AsyncLimitsClient(http) with patch.object(limits, "_request_json", AsyncMock(return_value={"ok": True})): await limits.get_context_limits("token") @@ -598,6 +701,79 @@ async def test_async_clients(self): with patch.object(peppol, "_request_json", AsyncMock(return_value={"ok": True})): await peppol.list_providers(page_offset=0, page_size=10) + async def test_async_peppol_client_without_pagination(self): + response = HttpResponse(200, httpx.Headers({}), b"{}") + http = DummyAsyncHttp(response) + peppol = AsyncPeppolClient(http) + with patch.object(peppol, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + await peppol.list_providers() + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + + async def test_async_auth_client_get_active_sessions_without_optional_filters(self): + response = HttpResponse(200, httpx.Headers({}), b"{}") + http = DummyAsyncHttp(response) + auth = AsyncAuthClient(http) + with patch.object(auth, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + await auth.get_active_sessions(continuation_token="", access_token="token") + self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + + async def test_async_certificates_client_query_without_pagination(self): + response = HttpResponse(200, httpx.Headers({}), b"{}") + http = DummyAsyncHttp(response) + certificates = AsyncCertificatesClient(http) + with patch.object( + certificates, "_request_json", AsyncMock(return_value={"ok": True}) + ) as request_json_mock: + await certificates.query_certificates({"a": 1}, access_token="token") + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + + async def test_async_invoices_client_query_metadata_without_optional_params(self): + response = HttpResponse(200, httpx.Headers({}), b"{}") + http = DummyAsyncHttp(response) + invoices = AsyncInvoicesClient(http) + with patch.object(invoices, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + await invoices.query_invoice_metadata({"subjectType": "Subject1"}, access_token="token") + self.assertIsNone(request_json_mock.call_args.kwargs["params"]) + + async def test_async_sessions_client_without_optional_filters(self): + response = HttpResponse(200, httpx.Headers({}), b"{}") + http = DummyAsyncHttp(response) + sessions = AsyncSessionsClient(http) + with patch.object(sessions, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + await sessions.get_sessions( + session_type="online", + continuation_token="", + reference_number="", + date_created_from="", + date_created_to="", + date_closed_from="", + date_closed_to="", + date_modified_from="", + date_modified_to="", + statuses=[], + access_token="token", + ) + self.assertEqual(request_json_mock.call_args.kwargs["params"], {"sessionType": "online"}) + self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) + + async def test_async_sessions_client_without_upo_feature_and_pagination(self): + response = HttpResponse(200, httpx.Headers({}), b"{}") + http = DummyAsyncHttp(response) + sessions = AsyncSessionsClient(http) + with patch.object(sessions, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + await sessions.open_online_session({"a": 1}, access_token="token") + await sessions.open_batch_session({"a": 1}, access_token="token") + await sessions.get_session_invoices("ref", continuation_token="", access_token="token") + await sessions.get_session_failed_invoices("ref", continuation_token="", access_token="token") + + self.assertIsNone(request_json_mock.call_args_list[0].kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args_list[1].kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args_list[2].kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args_list[2].kwargs["params"]) + self.assertIsNone(request_json_mock.call_args_list[3].kwargs["headers"]) + self.assertIsNone(request_json_mock.call_args_list[3].kwargs["params"]) + lighthouse = AsyncLighthouseClient(http, "https://api-latarnia-test.ksef.mf.gov.pl") with patch.object( lighthouse, diff --git a/tests/test_crypto.py b/tests/test_crypto.py index 697c12d..8487828 100644 --- a/tests/test_crypto.py +++ b/tests/test_crypto.py @@ -101,6 +101,8 @@ def test_private_key_loading(self): rsa_cert = generate_rsa_cert() key = crypto._load_private_key(rsa_cert.private_key_pem) self.assertEqual(key.key_size, rsa_cert.private_key.key_size) + key_from_bytes = crypto._load_private_key(rsa_cert.private_key_pem.encode("ascii")) + self.assertEqual(key_from_bytes.key_size, rsa_cert.private_key.key_size) der = rsa_cert.private_key.private_bytes( encoding=crypto.serialization.Encoding.DER, format=crypto.serialization.PrivateFormat.PKCS8, diff --git a/tests/test_http.py b/tests/test_http.py index b397b61..7ee5c46 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -293,6 +293,30 @@ async def test_async_skip_auth_presigned_validation_rejects_localhost(self): with self.assertRaisesRegex(ValueError, "localhost"): await client.request("GET", "https://localhost/upload", skip_auth=True) + async def test_async_request_absolute_url_without_skip_auth(self): + options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl") + client = AsyncBaseHttpClient(options, access_token="token") + response = httpx.Response(200, json={"ok": True}) + with patch.object( + client._client, "request", AsyncMock(return_value=response) + ) as request_mock: + await client.request("GET", "https://files.example.com/upload") + _, kwargs = request_mock.call_args + self.assertEqual(kwargs["url"], "https://files.example.com/upload") + self.assertIn("Authorization", kwargs["headers"]) + + async def test_async_skip_auth_presigned_url_accepts_valid_https(self): + options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl") + client = AsyncBaseHttpClient(options, access_token="token") + response = httpx.Response(200, json={"ok": True}) + with patch.object( + client._client, "request", AsyncMock(return_value=response) + ) as request_mock: + await client.request("GET", "https://files.example.com/upload", skip_auth=True) + _, kwargs = request_mock.call_args + self.assertEqual(kwargs["url"], "https://files.example.com/upload") + self.assertNotIn("Authorization", kwargs["headers"]) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_models.py b/tests/test_models.py index b9740e2..08c9a11 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -127,6 +127,7 @@ def test_lighthouse_enum_fallbacks(self): status = models.LighthouseStatusResponse.from_dict({"status": "UNKNOWN"}) self.assertEqual(status.status, models.LighthouseKsefStatus.AVAILABLE) self.assertIsNone(status.messages) + self.assertNotIn("messages", status.to_dict()) if __name__ == "__main__": diff --git a/tests/test_openapi_models.py b/tests/test_openapi_models.py index 223650f..82ef6d2 100644 --- a/tests/test_openapi_models.py +++ b/tests/test_openapi_models.py @@ -1,5 +1,7 @@ import json +from typing import List import unittest +from unittest.mock import patch from pathlib import Path from ksef_client import openapi_models as m @@ -116,6 +118,19 @@ def test_part_upload_request_headers_keep_non_string_values(self): self.assertTrue(parsed.headers["X-Enabled"]) self.assertEqual(parsed.to_dict()["headers"], payload["headers"]) + def test_convert_value_handles_unsubscripted_list_hint(self): + self.assertEqual(m._convert_value(List, ["a", "b"]), ["a", "b"]) + + def test_convert_value_openapi_model_non_dict_returns_raw_value(self): + self.assertEqual(m._convert_value(m.AuthenticationListItem, "raw"), "raw") + + def test_convert_value_union_with_empty_args_falls_through(self): + with ( + patch("ksef_client.openapi_models.get_origin", return_value=object()), + patch("ksef_client.openapi_models.get_args", return_value=()), + ): + self.assertEqual(m._convert_value("ignored", "value"), "value") + if __name__ == "__main__": unittest.main() diff --git a/tests/test_services_csr.py b/tests/test_services_csr.py index e2fe780..e5687e5 100644 --- a/tests/test_services_csr.py +++ b/tests/test_services_csr.py @@ -42,6 +42,10 @@ def test_generate_csr_ec(self): parsed = x509.load_der_x509_csr(csr_bytes) self.assertEqual(parsed.subject.rfc4514_string(), "CN=Test") + def test_build_subject_without_common_name(self): + subject = _build_subject({"organizationName": "KSeF"}) + self.assertEqual(subject.rfc4514_string(), "O=KSeF") + if __name__ == "__main__": unittest.main() diff --git a/tests/test_services_person_token.py b/tests/test_services_person_token.py index 0e675b9..66b78cc 100644 --- a/tests/test_services_person_token.py +++ b/tests/test_services_person_token.py @@ -63,6 +63,7 @@ def test_parse_json_string_array(self): self.assertEqual(_parse_json_string_array(json.dumps(["a", "b"])), ["a", "b"]) self.assertEqual(_parse_json_string_array("a,b"), ["a", "b"]) self.assertEqual(_parse_json_string_array("a"), ["a"]) + self.assertEqual(_parse_json_string_array(json.dumps({"a": 1})), ['{"a": 1}']) self.assertEqual(_parse_json_string_array(None), []) def test_unwrap_if_quoted_json(self): diff --git a/tests/test_services_workflows.py b/tests/test_services_workflows.py index 88d7855..4af4c23 100644 --- a/tests/test_services_workflows.py +++ b/tests/test_services_workflows.py @@ -360,6 +360,26 @@ def test_batch_session_workflow(self): ) self.assertEqual(ref, "ref") + def test_batch_session_workflow_without_offline_mode_flag(self): + sessions = StubSessionsClient() + http = RecordingHttp() + workflow = workflows.BatchSessionWorkflow(sessions, http) + rsa_cert = generate_rsa_cert() + zip_bytes = build_zip({"a.xml": b""}) + workflow.open_upload_and_close( + form_code={"systemCode": "FA"}, + zip_bytes=zip_bytes, + public_certificate=rsa_cert.certificate_pem, + access_token="token", + offline_mode=None, + upo_v43=False, + parallelism=1, + ) + open_batch_calls = [call for call in sessions.calls if call[0] == "open_batch"] + assert open_batch_calls + payload = open_batch_calls[0][1] + self.assertNotIn("offlineMode", payload) + def test_export_workflow(self): key = generate_symmetric_key() iv = generate_iv() @@ -384,6 +404,30 @@ class DummyInvoices: self.assertEqual(result.metadata_summaries[0]["ksefNumber"], "1") self.assertIn("inv.xml", result.invoice_xml_files) + def test_export_workflow_ignores_non_xml_non_metadata_files(self): + key = generate_symmetric_key() + iv = generate_iv() + files = { + "_metadata.json": json.dumps({"invoices": [{"ksefNumber": "1"}]}).encode("utf-8"), + "inv.xml": b"", + "notes.txt": b"ignored", + } + archive = build_zip(files) + encrypted = encrypt_aes_cbc_pkcs7(archive, key, iv) + encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type] + + class DummyInvoices: + pass + + workflow = workflows.ExportWorkflow(cast(InvoicesClient, DummyInvoices()), RecordingHttp()) + with patch.object( + workflow._download_helper, + "download_parts_with_hash", + return_value=[(encrypted, _sha256_b64(encrypted))], + ): + result = workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption) + self.assertNotIn("notes.txt", result.invoice_xml_files) + def test_export_workflow_rejects_missing_hash_by_default(self): key = generate_symmetric_key() iv = generate_iv() @@ -647,6 +691,24 @@ async def test_async_online_and_batch(self): ) self.assertEqual(ref, "ref") + async def test_async_batch_session_workflow_without_offline_mode_flag(self): + sessions = StubAsyncSessionsClient() + batch = workflows.AsyncBatchSessionWorkflow(sessions, RecordingAsyncHttp()) + rsa_cert = generate_rsa_cert() + zip_bytes = build_zip({"a.xml": b""}) + await batch.open_upload_and_close( + form_code={"systemCode": "FA"}, + zip_bytes=zip_bytes, + public_certificate=rsa_cert.certificate_pem, + access_token="token", + offline_mode=None, + upo_v43=False, + ) + open_batch_calls = [call for call in sessions.calls if call[0] == "open_batch"] + assert open_batch_calls + payload = open_batch_calls[0][1] + self.assertNotIn("offlineMode", payload) + async def test_async_export_workflow(self): key = generate_symmetric_key() iv = generate_iv() @@ -675,6 +737,35 @@ class DummyInvoices: ) self.assertEqual(result.metadata_summaries[0]["ksefNumber"], "1") + async def test_async_export_workflow_ignores_non_xml_non_metadata_files(self): + key = generate_symmetric_key() + iv = generate_iv() + files = { + "_metadata.json": json.dumps({"invoiceList": [{"ksefNumber": "1"}]}).encode("utf-8"), + "inv.xml": b"", + "notes.txt": b"ignored", + } + archive = build_zip(files) + encrypted = encrypt_aes_cbc_pkcs7(archive, key, iv) + encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type] + + class DummyInvoices: + pass + + workflow = workflows.AsyncExportWorkflow( + cast(AsyncInvoicesClient, DummyInvoices()), + RecordingAsyncHttp(), + ) + with patch.object( + workflow._download_helper, + "download_parts_with_hash", + AsyncMock(return_value=[(encrypted, _sha256_b64(encrypted))]), + ): + result = await workflow.download_and_process_package( + {"parts": [{"url": "u"}]}, encryption + ) + self.assertNotIn("notes.txt", result.invoice_xml_files) + async def test_async_export_workflow_rejects_missing_hash_by_default(self): key = generate_symmetric_key() iv = generate_iv() diff --git a/tests/test_zip_utils.py b/tests/test_zip_utils.py index 6a69d97..bd81fb8 100644 --- a/tests/test_zip_utils.py +++ b/tests/test_zip_utils.py @@ -37,6 +37,14 @@ def test_unzip_limits_max_compression_ratio(self): with self.assertRaises(ValueError): unzip_bytes(zip_bytes, max_compression_ratio=ratio - 0.001) + def test_unzip_safe_allows_disabling_compression_ratio_check(self): + payload = b"a" * (256 * 1024) + buffer = BytesIO() + with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf: + zf.writestr("a.txt", payload) + unzipped = unzip_bytes_safe(buffer.getvalue(), max_compression_ratio=None) + self.assertEqual(unzipped["a.txt"], payload) + def test_unzip_safe_rejects_invalid_limits(self): zip_bytes = build_zip({"a.txt": b"hello"}) with self.assertRaises(ValueError): @@ -76,6 +84,11 @@ def infolist_with_bad_metadata(self): ), self.assertRaises(ValueError): unzip_bytes_safe(zip_bytes) + def test_unzip_safe_accepts_empty_file_entry(self): + zip_bytes = build_zip({"empty.txt": b""}) + unzipped = unzip_bytes_safe(zip_bytes) + self.assertEqual(unzipped["empty.txt"], b"") + def test_unzip_safe_rejects_absolute_entry_path(self): buffer = BytesIO() with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf: From b2a14908b412d344cd4df741081752192d158159 Mon Sep 17 00:00:00 2001 From: smkc Date: Sat, 28 Feb 2026 09:40:03 +0100 Subject: [PATCH 2/2] test: fix lint issues in coverage tests --- tests/cli/unit/test_auth_manager.py | 27 ++++++++---- tests/cli/unit/test_sdk_adapters.py | 5 ++- tests/test_clients.py | 64 +++++++++++++++++++++-------- tests/test_openapi_models.py | 5 +-- 4 files changed, 74 insertions(+), 27 deletions(-) diff --git a/tests/cli/unit/test_auth_manager.py b/tests/cli/unit/test_auth_manager.py index d7e23f9..acefce3 100644 --- a/tests/cli/unit/test_auth_manager.py +++ b/tests/cli/unit/test_auth_manager.py @@ -244,9 +244,10 @@ def test_resolve_base_url_falls_back_when_profile_base_url_empty(monkeypatch) -> def test_resolve_lighthouse_base_url_prefers_explicit_value() -> None: - assert manager.resolve_lighthouse_base_url(" https://api-latarnia-test.ksef.mf.gov.pl/ ") == ( - "https://api-latarnia-test.ksef.mf.gov.pl/" - ).strip() + assert ( + manager.resolve_lighthouse_base_url(" https://api-latarnia-test.ksef.mf.gov.pl/ ") + == ("https://api-latarnia-test.ksef.mf.gov.pl/").strip() + ) def test_resolve_lighthouse_base_url_uses_profile_mapping(monkeypatch) -> None: @@ -548,11 +549,19 @@ def test_login_with_xades_loader_errors_are_mapped(monkeypatch) -> None: def test_login_with_token_without_save(monkeypatch) -> None: monkeypatch.setattr(manager, "create_client", lambda base_url: _FakeClient()) monkeypatch.setattr(manager, "AuthCoordinator", _FakeAuthCoordinator) - monkeypatch.setattr(manager, "save_tokens", lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("save_tokens should not be called"))) + monkeypatch.setattr( + manager, + "save_tokens", + lambda *args, **kwargs: (_ for _ in ()).throw( + AssertionError("save_tokens should not be called") + ), + ) monkeypatch.setattr( manager, "set_cached_metadata", - lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("set_cached_metadata should not be called")), + lambda *args, **kwargs: (_ for _ in ()).throw( + AssertionError("set_cached_metadata should not be called") + ), ) result = manager.login_with_token( @@ -574,12 +583,16 @@ def test_refresh_access_token_success_without_save(monkeypatch) -> None: monkeypatch.setattr( manager, "save_tokens", - lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("save_tokens should not be called")), + lambda *args, **kwargs: (_ for _ in ()).throw( + AssertionError("save_tokens should not be called") + ), ) monkeypatch.setattr( manager, "set_cached_metadata", - lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("set_cached_metadata should not be called")), + lambda *args, **kwargs: (_ for _ in ()).throw( + AssertionError("set_cached_metadata should not be called") + ), ) result = manager.refresh_access_token( diff --git a/tests/cli/unit/test_sdk_adapters.py b/tests/cli/unit/test_sdk_adapters.py index 809d0a3..e82f4c7 100644 --- a/tests/cli/unit/test_sdk_adapters.py +++ b/tests/cli/unit/test_sdk_adapters.py @@ -2471,7 +2471,10 @@ def close_session(self, reference_number, access_token): monkeypatch.setattr( adapters, "_wait_for_invoice_status", - lambda **kwargs: {"status": {"code": 200, "description": "Accepted"}, "ksefNumber": "KSEF-X"}, + lambda **kwargs: { + "status": {"code": 200, "description": "Accepted"}, + "ksefNumber": "KSEF-X", + }, ) invoice_path = tmp_path / "invoice.xml" diff --git a/tests/test_clients.py b/tests/test_clients.py index adb1486..c48577b 100644 --- a/tests/test_clients.py +++ b/tests/test_clients.py @@ -79,7 +79,9 @@ def test_auth_client(self): def test_auth_client_get_active_sessions_without_optional_filters(self): client = AuthClient(self.http) - with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + with patch.object( + client, "_request_json", Mock(return_value={"ok": True}) + ) as request_json_mock: client.get_active_sessions(continuation_token="", access_token="token") self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) self.assertIsNone(request_json_mock.call_args.kwargs["params"]) @@ -123,7 +125,9 @@ def test_sessions_client(self): def test_sessions_client_without_optional_filters(self): client = SessionsClient(self.http) - with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + with patch.object( + client, "_request_json", Mock(return_value={"ok": True}) + ) as request_json_mock: client.get_sessions( session_type="online", continuation_token="", @@ -137,12 +141,16 @@ def test_sessions_client_without_optional_filters(self): statuses=[], access_token="token", ) - self.assertEqual(request_json_mock.call_args.kwargs["params"], {"sessionType": "online"}) + self.assertEqual( + request_json_mock.call_args.kwargs["params"], {"sessionType": "online"} + ) self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) def test_sessions_client_without_upo_feature_and_pagination(self): client = SessionsClient(self.http) - with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + with patch.object( + client, "_request_json", Mock(return_value={"ok": True}) + ) as request_json_mock: client.open_online_session({"a": 1}, access_token="token") client.open_batch_session({"a": 1}, access_token="token") client.get_session_invoices("ref", continuation_token="", access_token="token") @@ -219,7 +227,9 @@ def test_invoices_client(self): def test_invoices_client_query_metadata_without_optional_params(self): client = InvoicesClient(self.http) - with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + with patch.object( + client, "_request_json", Mock(return_value={"ok": True}) + ) as request_json_mock: client.query_invoice_metadata({"subjectType": "Subject1"}, access_token="token") self.assertIsNone(request_json_mock.call_args.kwargs["params"]) @@ -294,7 +304,9 @@ def test_certificates_client(self): def test_certificates_client_query_without_pagination(self): client = CertificatesClient(self.http) - with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + with patch.object( + client, "_request_json", Mock(return_value={"ok": True}) + ) as request_json_mock: client.query_certificates({"a": 1}, access_token="token") self.assertIsNone(request_json_mock.call_args.kwargs["params"]) @@ -316,7 +328,9 @@ def test_tokens_client(self): def test_tokens_client_list_tokens_without_optional_filters(self): client = TokensClient(self.http) - with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + with patch.object( + client, "_request_json", Mock(return_value={"ok": True}) + ) as request_json_mock: client.list_tokens( access_token="token", statuses=[], @@ -371,7 +385,9 @@ def test_peppol_client(self): def test_peppol_client_without_pagination(self): client = PeppolClient(self.http) - with patch.object(client, "_request_json", Mock(return_value={"ok": True})) as request_json_mock: + with patch.object( + client, "_request_json", Mock(return_value={"ok": True}) + ) as request_json_mock: client.list_providers() self.assertIsNone(request_json_mock.call_args.kwargs["params"]) @@ -652,7 +668,9 @@ async def test_async_tokens_client_list_tokens_without_optional_filters(self): http = DummyAsyncHttp(response) payload = {"a": 1} tokens = AsyncTokensClient(http) - with patch.object(tokens, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + with patch.object( + tokens, "_request_json", AsyncMock(return_value={"ok": True}) + ) as request_json_mock: await tokens.list_tokens( access_token="token", statuses=[], @@ -705,7 +723,9 @@ async def test_async_peppol_client_without_pagination(self): response = HttpResponse(200, httpx.Headers({}), b"{}") http = DummyAsyncHttp(response) peppol = AsyncPeppolClient(http) - with patch.object(peppol, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + with patch.object( + peppol, "_request_json", AsyncMock(return_value={"ok": True}) + ) as request_json_mock: await peppol.list_providers() self.assertIsNone(request_json_mock.call_args.kwargs["params"]) @@ -713,7 +733,9 @@ async def test_async_auth_client_get_active_sessions_without_optional_filters(se response = HttpResponse(200, httpx.Headers({}), b"{}") http = DummyAsyncHttp(response) auth = AsyncAuthClient(http) - with patch.object(auth, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + with patch.object( + auth, "_request_json", AsyncMock(return_value={"ok": True}) + ) as request_json_mock: await auth.get_active_sessions(continuation_token="", access_token="token") self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) self.assertIsNone(request_json_mock.call_args.kwargs["params"]) @@ -732,7 +754,9 @@ async def test_async_invoices_client_query_metadata_without_optional_params(self response = HttpResponse(200, httpx.Headers({}), b"{}") http = DummyAsyncHttp(response) invoices = AsyncInvoicesClient(http) - with patch.object(invoices, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + with patch.object( + invoices, "_request_json", AsyncMock(return_value={"ok": True}) + ) as request_json_mock: await invoices.query_invoice_metadata({"subjectType": "Subject1"}, access_token="token") self.assertIsNone(request_json_mock.call_args.kwargs["params"]) @@ -740,7 +764,9 @@ async def test_async_sessions_client_without_optional_filters(self): response = HttpResponse(200, httpx.Headers({}), b"{}") http = DummyAsyncHttp(response) sessions = AsyncSessionsClient(http) - with patch.object(sessions, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + with patch.object( + sessions, "_request_json", AsyncMock(return_value={"ok": True}) + ) as request_json_mock: await sessions.get_sessions( session_type="online", continuation_token="", @@ -754,18 +780,24 @@ async def test_async_sessions_client_without_optional_filters(self): statuses=[], access_token="token", ) - self.assertEqual(request_json_mock.call_args.kwargs["params"], {"sessionType": "online"}) + self.assertEqual( + request_json_mock.call_args.kwargs["params"], {"sessionType": "online"} + ) self.assertIsNone(request_json_mock.call_args.kwargs["headers"]) async def test_async_sessions_client_without_upo_feature_and_pagination(self): response = HttpResponse(200, httpx.Headers({}), b"{}") http = DummyAsyncHttp(response) sessions = AsyncSessionsClient(http) - with patch.object(sessions, "_request_json", AsyncMock(return_value={"ok": True})) as request_json_mock: + with patch.object( + sessions, "_request_json", AsyncMock(return_value={"ok": True}) + ) as request_json_mock: await sessions.open_online_session({"a": 1}, access_token="token") await sessions.open_batch_session({"a": 1}, access_token="token") await sessions.get_session_invoices("ref", continuation_token="", access_token="token") - await sessions.get_session_failed_invoices("ref", continuation_token="", access_token="token") + await sessions.get_session_failed_invoices( + "ref", continuation_token="", access_token="token" + ) self.assertIsNone(request_json_mock.call_args_list[0].kwargs["headers"]) self.assertIsNone(request_json_mock.call_args_list[1].kwargs["headers"]) diff --git a/tests/test_openapi_models.py b/tests/test_openapi_models.py index 82ef6d2..7e4b104 100644 --- a/tests/test_openapi_models.py +++ b/tests/test_openapi_models.py @@ -1,8 +1,7 @@ import json -from typing import List import unittest -from unittest.mock import patch from pathlib import Path +from unittest.mock import patch from ksef_client import openapi_models as m @@ -119,7 +118,7 @@ def test_part_upload_request_headers_keep_non_string_values(self): self.assertEqual(parsed.to_dict()["headers"], payload["headers"]) def test_convert_value_handles_unsubscripted_list_hint(self): - self.assertEqual(m._convert_value(List, ["a", "b"]), ["a", "b"]) + self.assertEqual(m._convert_value(list, ["a", "b"]), ["a", "b"]) def test_convert_value_openapi_model_non_dict_returns_raw_value(self): self.assertEqual(m._convert_value(m.AuthenticationListItem, "raw"), "raw")