diff --git a/README.md b/README.md
index 1b2461a..7e21831 100644
--- a/README.md
+++ b/README.md
@@ -10,6 +10,10 @@
SDK zostało zaprojektowane w oparciu o oficjalne biblioteki referencyjne KSeF dla ekosystemów **Java** oraz **C#/.NET**, z naciskiem na zachowanie spójności pojęć oraz przepływów (workflow).
+## 🔄 Kompatybilność API KSeF
+
+Aktualna kompatybilność: **KSeF API `v2.1.1`** ([api-changelog.md](https://github.com/CIRFMF/ksef-docs/blob/2.1.1/api-changelog.md)).
+
## ✅ Funkcjonalności
- Klienci API (`KsefClient`, `AsyncKsefClient`) mapujący wywołania na endpointy KSeF.
@@ -104,6 +108,7 @@ tokens = AuthCoordinator(client.auth).authenticate_with_xades_key_pair(
context_identifier_type="nip",
context_identifier_value="5265877635",
subject_identifier_type="certificateSubject",
+ enforce_xades_compliance=False, # ustaw True, aby dodać X-KSeF-Feature: enforce-xades-compliance
).tokens
```
@@ -168,7 +173,76 @@ Uruchomienie testów z kontrolą pokrycia:
pytest --cov=ksef_client --cov-report=term-missing --cov-fail-under=100
```
-Testy E2E (marker `e2e`) są wyłączone w standardowym przebiegu i wymagają osobnej konfiguracji środowiska oraz danych dostępowych.
+Testy E2E (marker `e2e`) są wyłączone w standardowym przebiegu i wymagają osobnej konfiguracji
+środowiska oraz danych dostępowych.
+
+Scenariusz E2E obejmuje:
+- logowanie tokenem KSeF,
+- logowanie certyfikatem (XAdES),
+- wystawienie faktury,
+- pobranie UPO,
+- listowanie faktur,
+- pobranie ostatniej faktury.
+
+Plik testów E2E:
+- `tests/test_e2e_token_flows.py`
+
+Dostępne testy:
+- `test_e2e_test_environment_full_flow_token`
+- `test_e2e_test_environment_full_flow_xades`
+- `test_e2e_demo_environment_full_flow_token`
+- `test_e2e_demo_environment_full_flow_xades`
+
+Lokalne uruchomienie (token, TEST):
+
+```bash
+KSEF_E2E=1 \
+KSEF_TEST_TOKEN=... \
+KSEF_TEST_CONTEXT_TYPE=nip \
+KSEF_TEST_CONTEXT_VALUE=... \
+pytest tests/test_e2e_token_flows.py::test_e2e_test_environment_full_flow_token
+```
+
+Lokalne uruchomienie (XAdES, TEST):
+
+```bash
+KSEF_E2E=1 \
+KSEF_TEST_CONTEXT_TYPE=nip \
+KSEF_TEST_CONTEXT_VALUE=... \
+KSEF_TEST_XADES_CERT_PEM="$(cat cert.pem)" \
+KSEF_TEST_XADES_PRIVATE_KEY_PEM="$(cat key.pem)" \
+pytest tests/test_e2e_token_flows.py::test_e2e_test_environment_full_flow_xades
+```
+
+W GitHub Actions testy E2E uruchamia workflow:
+- `.github/workflows/python-e2e.yml`
+
+Workflow uruchamia się:
+- na `push` (dowolny branch),
+- na `pull_request` do `main`,
+- ręcznie przez `workflow_dispatch`.
+
+Repozytoryjne sekrety do ustawienia:
+- `KSEF_TEST_TOKEN`, `KSEF_TEST_CONTEXT_TYPE`, `KSEF_TEST_CONTEXT_VALUE` (token TEST)
+- `KSEF_DEMO_TOKEN`, `KSEF_DEMO_CONTEXT_TYPE`, `KSEF_DEMO_CONTEXT_VALUE` (token DEMO)
+- `KSEF_TEST_XADES_CERT_PEM` albo `KSEF_TEST_XADES_CERT_PEM_B64` (XAdES TEST)
+- `KSEF_TEST_XADES_PRIVATE_KEY_PEM` albo `KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64` (XAdES TEST)
+- `KSEF_TEST_XADES_SUBJECT_IDENTIFIER_TYPE` opcjonalnie, domyślnie `certificateSubject`
+- `KSEF_DEMO_XADES_CERT_PEM` albo `KSEF_DEMO_XADES_CERT_PEM_B64` (XAdES DEMO)
+- `KSEF_DEMO_XADES_PRIVATE_KEY_PEM` albo `KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64` (XAdES DEMO)
+- `KSEF_DEMO_XADES_SUBJECT_IDENTIFIER_TYPE` opcjonalnie, domyślnie `certificateSubject`
+
+Przygotowanie sekretów PEM w wariancie Base64 (jedna linia):
+
+```bash
+base64 < cert.pem | tr -d '\n'
+base64 < key.pem | tr -d '\n'
+```
+
+Anonimizacja w CI:
+- dane uwierzytelniające są pobierane wyłącznie z `GitHub Secrets`,
+- wartości sekretów są maskowane w logach (`::add-mask::`),
+- testy nie logują tokenów, certyfikatów ani identyfikatorów kontekstu.
## 🤝 Kontrybucja
diff --git a/docs/README.md b/docs/README.md
index 029fac3..4c79a1b 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -4,6 +4,8 @@ Dokumentacja opisuje **publiczne API** biblioteki `ksef-client-python` (import:
Opis kontraktu API (OpenAPI) oraz dokumenty procesowe i ograniczenia systemu znajdują się w `ksef-docs/`.
+Kompatybilność SDK: **KSeF API `v2.1.1`**.
+
## Wymagania
- Python `>= 3.10`
diff --git a/docs/api/auth.md b/docs/api/auth.md
index d3c2a10..dbeed90 100644
--- a/docs/api/auth.md
+++ b/docs/api/auth.md
@@ -37,12 +37,13 @@ Zwraca JSON z polami m.in.:
Challenge jest ważny **10 minut**. W przypadku dłuższego procesu podpisu (np. HSM) należy pobrać nowe wyzwanie.
-## `submit_xades_auth_request(signed_xml, verify_certificate_chain=None)`
+## `submit_xades_auth_request(signed_xml, verify_certificate_chain=None, enforce_xades_compliance=False)`
Endpoint: `POST /auth/xades-signature` (bez `accessToken`).
- `signed_xml` – podpisany XML `AuthTokenRequest`
- `verify_certificate_chain` – opcjonalna weryfikacja łańcucha certyfikatów (przydatne w TE)
+- `enforce_xades_compliance` – gdy `True`, dodaje nagłówek `X-KSeF-Feature: enforce-xades-compliance` (przydatne do wcześniejszej walidacji reguł XAdES na DEMO/PROD)
Zwraca (202) obiekt inicjujący auth, m.in. `referenceNumber` i `authenticationToken`.
diff --git a/docs/api/testdata.md b/docs/api/testdata.md
index 0b1fc5d..aba802d 100644
--- a/docs/api/testdata.md
+++ b/docs/api/testdata.md
@@ -58,6 +58,20 @@ Endpoint: `POST /testdata/attachment/revoke`
Wyłącza obsługę załączników w konfiguracji testowej.
+## Blokada uwierzytelnienia kontekstu
+
+### `block_context_authentication(request_payload, access_token=None)`
+
+Endpoint: `POST /testdata/context/block`
+
+Blokuje możliwość uwierzytelniania dla wskazanego kontekstu (auth kończy się kodem `480`).
+
+### `unblock_context_authentication(request_payload, access_token=None)`
+
+Endpoint: `POST /testdata/context/unblock`
+
+Odblokowuje możliwość uwierzytelniania dla wskazanego kontekstu.
+
## Limity i rate limits (test)
### `change_session_limits(request_payload, access_token)`
diff --git a/docs/services/workflows.md b/docs/services/workflows.md
index 06cafc4..dfd61ea 100644
--- a/docs/services/workflows.md
+++ b/docs/services/workflows.md
@@ -49,6 +49,7 @@ Najczęściej używane parametry:
- `context_identifier_value`: np. NIP
- `subject_identifier_type`: np. `"certificateSubject"` (lub `"certificateFingerprint"`)
- `verify_certificate_chain`: przydatne w TE
+- `enforce_xades_compliance`: gdy `True`, ustawia nagłówek `X-KSeF-Feature: enforce-xades-compliance`
- `poll_interval_seconds`, `max_attempts`: parametry pollingu
### `AuthCoordinator.authenticate_with_ksef_token(...) -> AuthResult`
diff --git a/docs/workflows/auth.md b/docs/workflows/auth.md
index 478a9d6..888a8aa 100644
--- a/docs/workflows/auth.md
+++ b/docs/workflows/auth.md
@@ -65,12 +65,15 @@ result = AuthCoordinator(client.auth).authenticate_with_xades_key_pair(
context_identifier_value="5265877635",
subject_identifier_type="certificateSubject",
verify_certificate_chain=None,
+ enforce_xades_compliance=False,
max_attempts=90,
poll_interval_seconds=2.0,
)
access_token = result.tokens.access_token.token
```
+`enforce_xades_compliance=True` wymusza dodanie nagłówka `X-KSeF-Feature: enforce-xades-compliance` podczas `POST /auth/xades-signature`.
+
W przypadku posiadania certyfikatu i klucza jako osobnych plików dostępne jest również wczytanie przez `XadesKeyPair.from_pem_files(...)`.
Challenge jest ważny ok. 10 minut. W przypadku dłuższego procesu podpisu (np. HSM/podpis kwalifikowany) wymagane jest pobranie nowego wyzwania.
diff --git a/src/ksef_client/clients/auth.py b/src/ksef_client/clients/auth.py
index 706c1f3..ecfe08b 100644
--- a/src/ksef_client/clients/auth.py
+++ b/src/ksef_client/clients/auth.py
@@ -4,6 +4,9 @@
from .base import AsyncBaseApiClient, BaseApiClient
+_KSEF_FEATURE_HEADER = "X-KSeF-Feature"
+_ENFORCE_XADES_COMPLIANCE_FEATURE = "enforce-xades-compliance"
+
class AuthClient(BaseApiClient):
def get_active_sessions(
@@ -51,6 +54,7 @@ def submit_xades_auth_request(
signed_xml: str,
*,
verify_certificate_chain: bool | None = None,
+ enforce_xades_compliance: bool = False,
) -> Any:
params = {}
if verify_certificate_chain is not None:
@@ -59,6 +63,8 @@ def submit_xades_auth_request(
"Content-Type": "application/xml",
"Accept": "application/json",
}
+ if enforce_xades_compliance:
+ headers[_KSEF_FEATURE_HEADER] = _ENFORCE_XADES_COMPLIANCE_FEATURE
response_bytes = self._request_bytes(
"POST",
"/auth/xades-signature",
@@ -152,6 +158,7 @@ async def submit_xades_auth_request(
signed_xml: str,
*,
verify_certificate_chain: bool | None = None,
+ enforce_xades_compliance: bool = False,
) -> Any:
params = {}
if verify_certificate_chain is not None:
@@ -160,6 +167,8 @@ async def submit_xades_auth_request(
"Content-Type": "application/xml",
"Accept": "application/json",
}
+ if enforce_xades_compliance:
+ headers[_KSEF_FEATURE_HEADER] = _ENFORCE_XADES_COMPLIANCE_FEATURE
response_bytes = await self._request_bytes(
"POST",
"/auth/xades-signature",
diff --git a/src/ksef_client/clients/testdata.py b/src/ksef_client/clients/testdata.py
index 33be1c4..6dd2f04 100644
--- a/src/ksef_client/clients/testdata.py
+++ b/src/ksef_client/clients/testdata.py
@@ -96,6 +96,28 @@ def disable_attachment(
expected_status={200, 204},
)
+ def block_context_authentication(
+ self, request_payload: dict[str, Any], *, access_token: str | None = None
+ ) -> None:
+ self._request_json(
+ "POST",
+ "/testdata/context/block",
+ json=request_payload,
+ access_token=access_token,
+ expected_status={200},
+ )
+
+ def unblock_context_authentication(
+ self, request_payload: dict[str, Any], *, access_token: str | None = None
+ ) -> None:
+ self._request_json(
+ "POST",
+ "/testdata/context/unblock",
+ json=request_payload,
+ access_token=access_token,
+ expected_status={200},
+ )
+
def change_session_limits(self, request_payload: dict[str, Any], *, access_token: str) -> Any:
return self._request_json(
"POST",
@@ -240,6 +262,28 @@ async def disable_attachment(
expected_status={200, 204},
)
+ async def block_context_authentication(
+ self, request_payload: dict[str, Any], *, access_token: str | None = None
+ ) -> None:
+ await self._request_json(
+ "POST",
+ "/testdata/context/block",
+ json=request_payload,
+ access_token=access_token,
+ expected_status={200},
+ )
+
+ async def unblock_context_authentication(
+ self, request_payload: dict[str, Any], *, access_token: str | None = None
+ ) -> None:
+ await self._request_json(
+ "POST",
+ "/testdata/context/unblock",
+ json=request_payload,
+ access_token=access_token,
+ expected_status={200},
+ )
+
async def change_session_limits(
self, request_payload: dict[str, Any], *, access_token: str
) -> Any:
diff --git a/src/ksef_client/openapi_models.py b/src/ksef_client/openapi_models.py
index e0a7879..abe1421 100644
--- a/src/ksef_client/openapi_models.py
+++ b/src/ksef_client/openapi_models.py
@@ -101,6 +101,12 @@ class AuthenticationMethod(Enum):
PERSONALSIGNATURE = "PersonalSignature"
PEPPOLSIGNATURE = "PeppolSignature"
+class AuthenticationMethodCategory(Enum):
+ XADESSIGNATURE = "XadesSignature"
+ NATIONALNODE = "NationalNode"
+ TOKEN = "Token"
+ OTHER = "Other"
+
class AuthenticationTokenStatus(Enum):
PENDING = "Pending"
ACTIVE = "Active"
@@ -617,6 +623,12 @@ class SubunitPermissionsSubunitIdentifierType(Enum):
INTERNALID = "InternalId"
NIP = "Nip"
+class TestDataAuthenticationContextIdentifierType(Enum):
+ NIP = "Nip"
+ INTERNALID = "InternalId"
+ NIPVATUE = "NipVatUe"
+ PEPPOLID = "PeppolId"
+
class TestDataAuthorizedIdentifierType(Enum):
NIP = "Nip"
PESEL = "Pesel"
@@ -737,6 +749,7 @@ class AuthenticationInitResponse(OpenApiModel):
@dataclass(frozen=True)
class AuthenticationListItem(OpenApiModel):
authenticationMethod: AuthenticationMethod
+ authenticationMethodInfo: AuthenticationMethodInfo
referenceNumber: ReferenceNumber
startDate: str
status: StatusInfo
@@ -750,9 +763,16 @@ class AuthenticationListResponse(OpenApiModel):
items: list[AuthenticationListItem]
continuationToken: Optional[str] = None
+@dataclass(frozen=True)
+class AuthenticationMethodInfo(OpenApiModel):
+ category: AuthenticationMethodCategory
+ code: str
+ displayName: str
+
@dataclass(frozen=True)
class AuthenticationOperationStatusResponse(OpenApiModel):
authenticationMethod: AuthenticationMethod
+ authenticationMethodInfo: AuthenticationMethodInfo
startDate: str
status: StatusInfo
isTokenRedeemed: Optional[bool] = None
@@ -796,6 +816,10 @@ class BatchSessionEffectiveContextLimits(OpenApiModel):
maxInvoiceWithAttachmentSizeInMB: int
maxInvoices: int
+@dataclass(frozen=True)
+class BlockContextAuthenticationRequest(OpenApiModel):
+ contextIdentifier: Optional[TestDataAuthenticationContextIdentifier] = None
+
@dataclass(frozen=True)
class CertificateEffectiveSubjectLimits(OpenApiModel):
maxCertificates: Optional[int] = None
@@ -1768,6 +1792,11 @@ class SubunitPermissionsSubunitIdentifier(OpenApiModel):
type: SubunitPermissionsSubunitIdentifierType
value: str
+@dataclass(frozen=True)
+class TestDataAuthenticationContextIdentifier(OpenApiModel):
+ type: TestDataAuthenticationContextIdentifierType
+ value: str
+
@dataclass(frozen=True)
class TestDataAuthorizedIdentifier(OpenApiModel):
type: TestDataAuthorizedIdentifierType
@@ -1825,6 +1854,10 @@ class TokenStatusResponse(OpenApiModel):
class TooManyRequestsResponse(OpenApiModel):
status: dict[str, Any]
+@dataclass(frozen=True)
+class UnblockContextAuthenticationRequest(OpenApiModel):
+ contextIdentifier: Optional[TestDataAuthenticationContextIdentifier] = None
+
@dataclass(frozen=True)
class UpoPageResponse(OpenApiModel):
downloadUrl: str
diff --git a/src/ksef_client/services/workflows.py b/src/ksef_client/services/workflows.py
index 905ec7c..f737dab 100644
--- a/src/ksef_client/services/workflows.py
+++ b/src/ksef_client/services/workflows.py
@@ -98,6 +98,7 @@ def submit_xades_auth_request(
signed_xml: str,
*,
verify_certificate_chain: bool | None = None,
+ enforce_xades_compliance: bool = False,
) -> dict[str, Any] | None: ...
def submit_ksef_token_auth(self, request_payload: dict[str, Any]) -> dict[str, Any] | None: ...
@@ -117,6 +118,7 @@ async def submit_xades_auth_request(
signed_xml: str,
*,
verify_certificate_chain: bool | None = None,
+ enforce_xades_compliance: bool = False,
) -> dict[str, Any] | None: ...
async def submit_ksef_token_auth(
@@ -300,6 +302,7 @@ def authenticate_with_xades_key_pair(
context_identifier_value: str,
subject_identifier_type: str,
verify_certificate_chain: bool | None = None,
+ enforce_xades_compliance: bool = False,
authorization_policy_xml: str | None = None,
poll_interval_seconds: float = 2.0,
max_attempts: int = 30,
@@ -311,6 +314,7 @@ def authenticate_with_xades_key_pair(
certificate_pem=key_pair.certificate_pem,
private_key_pem=key_pair.private_key_pem,
verify_certificate_chain=verify_certificate_chain,
+ enforce_xades_compliance=enforce_xades_compliance,
authorization_policy_xml=authorization_policy_xml,
poll_interval_seconds=poll_interval_seconds,
max_attempts=max_attempts,
@@ -325,6 +329,7 @@ def authenticate_with_xades(
certificate_pem: str,
private_key_pem: str,
verify_certificate_chain: bool | None = None,
+ enforce_xades_compliance: bool = False,
authorization_policy_xml: str | None = None,
poll_interval_seconds: float = 2.0,
max_attempts: int = 30,
@@ -341,7 +346,9 @@ def authenticate_with_xades(
signed_xml = sign_xades_enveloped(xml, certificate_pem, private_key_pem)
init = self._auth.submit_xades_auth_request(
- signed_xml, verify_certificate_chain=verify_certificate_chain
+ signed_xml,
+ verify_certificate_chain=verify_certificate_chain,
+ enforce_xades_compliance=enforce_xades_compliance,
)
if init is None:
raise RuntimeError("submit_xades_auth_request returned empty response")
@@ -435,6 +442,7 @@ async def authenticate_with_xades_key_pair(
context_identifier_value: str,
subject_identifier_type: str,
verify_certificate_chain: bool | None = None,
+ enforce_xades_compliance: bool = False,
authorization_policy_xml: str | None = None,
poll_interval_seconds: float = 2.0,
max_attempts: int = 30,
@@ -446,6 +454,7 @@ async def authenticate_with_xades_key_pair(
certificate_pem=key_pair.certificate_pem,
private_key_pem=key_pair.private_key_pem,
verify_certificate_chain=verify_certificate_chain,
+ enforce_xades_compliance=enforce_xades_compliance,
authorization_policy_xml=authorization_policy_xml,
poll_interval_seconds=poll_interval_seconds,
max_attempts=max_attempts,
@@ -460,6 +469,7 @@ async def authenticate_with_xades(
certificate_pem: str,
private_key_pem: str,
verify_certificate_chain: bool | None = None,
+ enforce_xades_compliance: bool = False,
authorization_policy_xml: str | None = None,
poll_interval_seconds: float = 2.0,
max_attempts: int = 30,
@@ -476,7 +486,9 @@ async def authenticate_with_xades(
signed_xml = sign_xades_enveloped(xml, certificate_pem, private_key_pem)
init = await self._auth.submit_xades_auth_request(
- signed_xml, verify_certificate_chain=verify_certificate_chain
+ signed_xml,
+ verify_certificate_chain=verify_certificate_chain,
+ enforce_xades_compliance=enforce_xades_compliance,
)
if init is None:
raise RuntimeError("submit_xades_auth_request returned empty response")
diff --git a/tests/test_clients.py b/tests/test_clients.py
index e340bb0..a492aa8 100644
--- a/tests/test_clients.py
+++ b/tests/test_clients.py
@@ -47,7 +47,7 @@ def test_auth_client(self):
client,
"_request_bytes",
Mock(side_effect=[b'{"status": "ok"}', b""]),
- ),
+ ) as request_bytes_mock,
):
client.get_active_sessions(
page_size=10, continuation_token="cont", access_token="token"
@@ -55,7 +55,15 @@ def test_auth_client(self):
client.revoke_current_session("token")
client.revoke_session("ref", "token")
client.get_challenge()
- result = client.submit_xades_auth_request("", verify_certificate_chain=True)
+ result = client.submit_xades_auth_request(
+ "",
+ verify_certificate_chain=True,
+ enforce_xades_compliance=True,
+ )
+ self.assertEqual(
+ request_bytes_mock.call_args_list[0].kwargs["headers"].get("X-KSeF-Feature"),
+ "enforce-xades-compliance",
+ )
self.assertEqual(result["status"], "ok")
self.assertIsNone(client.submit_xades_auth_request(""))
client.submit_ksef_token_auth({"a": 1})
@@ -213,6 +221,8 @@ def test_testdata_client(self):
client.revoke_permissions(payload)
client.enable_attachment(payload)
client.disable_attachment(payload)
+ client.block_context_authentication(payload)
+ client.unblock_context_authentication(payload)
client.change_session_limits(payload, access_token="token")
client.reset_session_limits(access_token="token")
client.change_certificate_limits(payload, access_token="token")
@@ -239,7 +249,7 @@ async def test_async_clients(self):
auth,
"_request_bytes",
AsyncMock(side_effect=[b'{"status": "ok"}', b""]),
- ),
+ ) as request_bytes_mock,
):
await auth.get_active_sessions(
page_size=10,
@@ -249,7 +259,15 @@ async def test_async_clients(self):
await auth.revoke_current_session("token")
await auth.revoke_session("ref", "token")
await auth.get_challenge()
- result = await auth.submit_xades_auth_request("", verify_certificate_chain=True)
+ result = await auth.submit_xades_auth_request(
+ "",
+ verify_certificate_chain=True,
+ enforce_xades_compliance=True,
+ )
+ self.assertEqual(
+ request_bytes_mock.call_args_list[0].kwargs["headers"].get("X-KSeF-Feature"),
+ "enforce-xades-compliance",
+ )
self.assertEqual(result["status"], "ok")
self.assertIsNone(
await auth.submit_xades_auth_request(
@@ -444,6 +462,8 @@ async def test_async_clients(self):
await testdata.revoke_permissions(payload)
await testdata.enable_attachment(payload)
await testdata.disable_attachment(payload)
+ await testdata.block_context_authentication(payload)
+ await testdata.unblock_context_authentication(payload)
await testdata.change_session_limits(payload, access_token="token")
await testdata.reset_session_limits(access_token="token")
await testdata.change_certificate_limits(payload, access_token="token")
diff --git a/tests/test_openapi_models.py b/tests/test_openapi_models.py
index 5d271f4..03b92e8 100644
--- a/tests/test_openapi_models.py
+++ b/tests/test_openapi_models.py
@@ -10,8 +10,14 @@ def test_from_dict_none(self):
def test_enum_and_simple_model(self):
method_value = list(m.AuthenticationMethod)[0].value
+ category_value = list(m.AuthenticationMethodCategory)[0].value
data = {
"authenticationMethod": method_value,
+ "authenticationMethodInfo": {
+ "category": category_value,
+ "code": "auth.method.code",
+ "displayName": "Auth method display name",
+ },
"referenceNumber": "ref-1",
"startDate": "2024-01-01",
"status": {"code": 200, "description": "ok"},
diff --git a/tests/test_services_workflows.py b/tests/test_services_workflows.py
index 29e6b38..1ca3dcf 100644
--- a/tests/test_services_workflows.py
+++ b/tests/test_services_workflows.py
@@ -38,11 +38,18 @@ async def request(self, *args, **kwargs) -> HttpResponse:
@dataclass
class StubAuthClient:
codes: list[int]
+ last_enforce_xades_compliance: bool = False
def get_challenge(self):
return {"challenge": "c", "timestampMs": 123}
- def submit_xades_auth_request(self, signed_xml: str, verify_certificate_chain=None):
+ def submit_xades_auth_request(
+ self,
+ signed_xml: str,
+ verify_certificate_chain=None,
+ enforce_xades_compliance: bool = False,
+ ):
+ self.last_enforce_xades_compliance = enforce_xades_compliance
return {"referenceNumber": "ref", "authenticationToken": {"token": "auth"}}
def submit_ksef_token_auth(self, payload):
@@ -59,11 +66,18 @@ def redeem_token(self, authentication_token):
@dataclass
class StubAsyncAuthClient:
codes: list[int]
+ last_enforce_xades_compliance: bool = False
async def get_challenge(self):
return {"challenge": "c", "timestampMs": 123}
- async def submit_xades_auth_request(self, signed_xml: str, verify_certificate_chain=None):
+ async def submit_xades_auth_request(
+ self,
+ signed_xml: str,
+ verify_certificate_chain=None,
+ enforce_xades_compliance: bool = False,
+ ):
+ self.last_enforce_xades_compliance = enforce_xades_compliance
return {"referenceNumber": "ref", "authenticationToken": {"token": "auth"}}
async def submit_ksef_token_auth(self, payload):
@@ -193,6 +207,22 @@ def test_auth_coordinator_success_and_errors(self):
max_attempts=2,
)
self.assertEqual(result.authentication_token, "auth")
+ self.assertFalse(auth.last_enforce_xades_compliance)
+
+ auth_with_feature = StubAuthClient([200])
+ coord_with_feature = workflows.AuthCoordinator(auth_with_feature)
+ with patch("ksef_client.services.xades.sign_xades_enveloped", return_value="signed"):
+ coord_with_feature.authenticate_with_xades(
+ context_identifier_type="nip",
+ context_identifier_value="123",
+ subject_identifier_type="certificateSubject",
+ certificate_pem=rsa_cert.certificate_pem,
+ private_key_pem=rsa_cert.private_key_pem,
+ enforce_xades_compliance=True,
+ poll_interval_seconds=0,
+ max_attempts=1,
+ )
+ self.assertTrue(auth_with_feature.last_enforce_xades_compliance)
with patch("ksef_client.services.xades.sign_xades_enveloped", return_value="signed"):
coord_pair = workflows.AuthCoordinator(StubAuthClient([100, 200]))
@@ -243,7 +273,12 @@ def test_auth_coordinator_ksef_token(self):
self.assertEqual(result.tokens.access_token.token, "acc")
class StubAuthClientNoneXades(StubAuthClient):
- def submit_xades_auth_request(self, signed_xml: str, verify_certificate_chain=None):
+ def submit_xades_auth_request(
+ self,
+ signed_xml: str,
+ verify_certificate_chain=None,
+ enforce_xades_compliance: bool = False,
+ ):
return None
rsa_cert = generate_rsa_cert()
@@ -387,6 +422,22 @@ async def test_async_auth_coordinator(self):
max_attempts=1,
)
self.assertEqual(result_xades.authentication_token, "auth")
+ self.assertFalse(auth_xades.last_enforce_xades_compliance)
+
+ auth_xades_feature = StubAsyncAuthClient([200])
+ coord_xades_feature = workflows.AsyncAuthCoordinator(auth_xades_feature)
+ with patch("ksef_client.services.xades.sign_xades_enveloped", return_value="signed"):
+ await coord_xades_feature.authenticate_with_xades(
+ context_identifier_type="nip",
+ context_identifier_value="123",
+ subject_identifier_type="certificateSubject",
+ certificate_pem=rsa_cert.certificate_pem,
+ private_key_pem=rsa_cert.private_key_pem,
+ enforce_xades_compliance=True,
+ poll_interval_seconds=0,
+ max_attempts=1,
+ )
+ self.assertTrue(auth_xades_feature.last_enforce_xades_compliance)
with patch("ksef_client.services.xades.sign_xades_enveloped", return_value="signed"):
coord_xades_pair = workflows.AsyncAuthCoordinator(StubAsyncAuthClient([200]))
@@ -424,7 +475,10 @@ async def get_auth_status(self, reference_number, authentication_token):
class StubAsyncAuthClientNoneXades(StubAsyncAuthClient):
async def submit_xades_auth_request(
- self, signed_xml: str, verify_certificate_chain=None
+ self,
+ signed_xml: str,
+ verify_certificate_chain=None,
+ enforce_xades_compliance: bool = False,
):
return None