diff --git a/README.md b/README.md index 22f648a..cf61df8 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,49 @@ client.set_user_enabled(user.Id, enabled=True) ### Bitwarden client +#### Login/… creation & lookup +```python +import urllib.parse +import secrets +from vaultwarden.models.bitwarden import Login, LoginData, UriMatch, UriMatchDetection +from vaultwarden.clients.bitwarden import BitwardenAPIClient + +bitwarden_client = BitwardenAPIClient(url="http://127.0.0.1", + email="test-account@example.com", + password="test-account", + client_id="user.a8be340c-856b-481f-8183-2b7712995da2", + client_secret="ag66paVUq4h7tBLbCbJOY5tJkQvUuT", + device_id="e54ba5f5-7d58-4830-8f2b-99194c70c14f") +bitwarden_client.sync() + +# create +uri = urllib.parse.urlparse(url:="http://username:password@login.example.org") +key = secrets.token_bytes(64) + +data = LoginData.model_construct( + name=uri.hostname, + password=uri.username, + username=uri.password, + uris = [UriMatch.model_construct(match = UriMatchDetection.HOST, uri=url)] +) +item = Login.model_construct( + name=f"{uri.username}@{uri.hostname}", + login=data, + data=data, + key=key, +) + +bitwarden_client.create_item(item, None, None) + +# refresh cache +bitwarden_client.sync(force_refresh=True) + +# lookup +print(list(bitwarden_client.search_items(name="login.example."))) +print(list(bitwarden_client.search_items(uri="http://login.example.org"))) +``` + +#### User / Org / Collection Management ```python from vaultwarden.clients.bitwarden import BitwardenAPIClient from vaultwarden.models.bitwarden import Organization, OrganizationCollection, get_organization @@ -89,6 +132,7 @@ if my_user: ``` + ## Compatibility This library is compatible with vaultwarden 1.32.0 and above. diff --git a/pyproject.toml b/pyproject.toml index b548d51..ef0fb14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,11 @@ test = [ "pytest~=8.3", ] +[tool.pytest.ini_options] +filterwarnings = [ + "error" +] + [tool.hatch.version] path = "src/vaultwarden/__version__.py" @@ -68,6 +73,7 @@ packages = [ [tool.hatch.envs.test] dependencies = [ "coverage", + "pytest" ] [tool.hatch.envs.test.scripts] diff --git a/src/vaultwarden/clients/bitwarden.py b/src/vaultwarden/clients/bitwarden.py index 3650b11..8ce583b 100644 --- a/src/vaultwarden/clients/bitwarden.py +++ b/src/vaultwarden/clients/bitwarden.py @@ -1,19 +1,35 @@ +import typing from typing import Literal from uuid import UUID from httpx import Client, Response +from vaultwarden.models.bitwarden import ( + CipherDetail, + CipherDetails, + Organization, + OrganizationCollection, + OrgData, + RegisterData, + get_organization, +) +from vaultwarden.models.crypto import CryptoContext from vaultwarden.models.exception_models import BitwardenError from vaultwarden.models.sync import ConnectToken, SyncData -from vaultwarden.utils.crypto import make_master_key +from vaultwarden.utils.crypto import masterPasswordHash from vaultwarden.utils.logger import log_raise_for_status +if typing.TYPE_CHECKING: + from vaultwarden.models.bitwarden import ( + Kdf, + ) + class BitwardenAPIClient: def __init__( self, url: str, - email: str, + email: str | None, password: str, client_id: str, client_secret: str, @@ -23,7 +39,7 @@ def __init__( # if one of the parameters is None, raise an exception if not all([url, password, client_id, client_secret, device_id]): raise BitwardenError("All parameters are required") - self.email = email + self.email: str | None = email self.password = password self.client_id = client_id self.client_secret = client_secret @@ -38,6 +54,9 @@ def __init__( self._connect_token: ConnectToken | None = None self._sync: SyncData | None = None + def close(self): + self._http_client.close() + @property def connect_token(self) -> ConnectToken | None: return self._connect_token @@ -46,6 +65,12 @@ def connect_token(self) -> ConnectToken | None: def connect_token(self, value: ConnectToken): self._connect_token = value + @property + def masterPasswordHash(self): # noqa: N802 + return masterPasswordHash( + self._connect_token._master_key, self.password + ) + # refresh connect token if expired def _refresh_connect_token(self): if ( @@ -53,34 +78,15 @@ def _refresh_connect_token(self): or self.connect_token.refresh_token is None ): self._set_connect_token() - return - headers = { - "content-type": "application/x-www-form-urlencoded; charset=utf-8", - } - payload = { - "grant_type": "refresh_token", - "refresh_token": self.connect_token.refresh_token, - } - resp = self._http_client.post( - "identity/connect/token", headers=headers, data=payload - ) - self._connect_token = ConnectToken.model_validate_json(resp.text) - - import vaultwarden.models.bitwarden - - self._connect_token.master_key = make_master_key( - password=self.password, - salt=self.email, - kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token( - self._connect_token - ), - ) + else: + payload = { + "grant_type": "refresh_token", + "refresh_token": self.connect_token.refresh_token, + } + self._set_connect_token(payload) - def _set_connect_token(self): - headers = { - "content-type": "application/x-www-form-urlencoded; charset=utf-8", - } - payload = { + def _set_connect_token(self, refresh: dict | None = None): + payload = refresh or { "grant_type": "client_credentials", "client_secret": f"{self.client_secret}", "client_id": f"{self.client_id}", @@ -90,31 +96,28 @@ def _set_connect_token(self): "deviceIdentifier": f"{self.device_id}", "deviceName": "python-vaultwarden", } + headers = { + "content-type": "application/x-www-form-urlencoded; charset=utf-8", + } resp = self._http_client.post( "identity/connect/token", headers=headers, data=payload ) - self._connect_token = ConnectToken.model_validate_json(resp.text) - if self.email is None: + access_token = resp.json()["access_token"] headers = { - "Authorization": f"Bearer {self._connect_token.access_token}", + "Authorization": f"Bearer {access_token}", "content-type": "application/json; charset=utf-8", "Accept": "*/*", } - resp = self._http_client.get( + mresp = self._http_client.get( "api/accounts/profile", headers=headers ) - self.email = resp.json()["email"] - - import vaultwarden.models.bitwarden + self.email = mresp.json()["email"] - self._connect_token.master_key = make_master_key( - password=self.password, - salt=self.email, - kdf=vaultwarden.models.bitwarden.Kdf.from_connect_token( - self._connect_token - ), + self._connect_token = ConnectToken.model_validate_json( + resp.text, context=CryptoContext(client=self) ) + return # login to api @@ -145,9 +148,12 @@ def _api_request( raise BitwardenError("Fail to connect") headers = { "Authorization": f"Bearer {self.connect_token.access_token}", - "content-type": "application/json; charset=utf-8", "Accept": "*/*", } + + if kwargs.get("json") is not None: + headers["content-type"] = "application/json; charset=utf-8" + return self._http_client.request( method, path, headers=headers, **kwargs ) @@ -155,5 +161,209 @@ def _api_request( def sync(self, force_refresh: bool = False) -> SyncData: if self._sync is None or force_refresh: resp = self._api_request("GET", "api/sync") - self._sync = SyncData.model_validate_json(resp.text) + return self._sync_step(resp.json()) + return self._sync + + def _sync_step(self, data: dict) -> SyncData: + v: dict[str, typing.Any] = { + "profile": data.get("profile") or data.get("Profile"), + "ciphers": [], + "collections": [], + "folders": [], + "policies": [], + "sends": [], + "domains": {}, + } + # populate self._sync.Profile + self._sync = SyncData.model_validate( + v, context=CryptoContext(client=self) + ) + # uses self._sync.Profile + self._sync = SyncData.model_validate( + data, + context=CryptoContext(client=self), + ) return self._sync + + def create_organization( + self, + name: str, + email: str, + default_collection_name: str = "DefaultCollection", + ) -> Organization: + if not self.connect_token: + raise BitwardenError("Not connected") + assert self._connect_token + + from secrets import token_bytes + + req = OrgData.model_construct( + Name=name, + BillingEmail=email, + CollectionName=default_collection_name, + PlanType=0, + Key=token_bytes(64), + ) + ctx = CryptoContext(client=self) + ctx.push(self._connect_token.PrivateKey) + data = req.model_dump( + mode="json", + by_alias=True, + exclude_none=True, + exclude_unset=True, + context=ctx, + ) + v = self.api_request("POST", "api/organizations", json=data) + return Organization.model_validate( + v.json(), context=CryptoContext(client=self) + ) + + # def get_organization(self, name) -> "Organization": + # pass + + def create_user( + self, + email: str, + password: str, + name, + kdf: "Kdf", + ): + assert email == email.lower(), "email is not lowercase" + assert len(password) >= 8, "password is too short (< 8 characters)" + + rd = RegisterData.model_construct( + email=email, + password=password, + name=name, + **kdf.model_dump(mode="json", by_alias=True), + ) + data = rd.model_dump( + mode="json", + by_alias=True, + exclude_none=True, + exclude_unset=True, + context=CryptoContext(client=self), + ) + resp = self._api_request("POST", "api/accounts/register", json=data) + # user = self._api_request("GET", f"api/users/{email}") + return resp + + def search_items( + self, + uri: str | None = None, + name: str | None = None, + organisations: list[Organization] | None = None, + collections: list[OrganizationCollection] | None = None, + types: list[type[CipherDetails]] | None = None, + ) -> typing.Generator["CipherDetails", None, None]: + selectors: list[typing.Callable[["CipherDetails"], bool]] = list() + + if uri is not None: + + def by_uri(item: CipherDetails) -> bool: + return item.uri_match(uri) + + selectors.append(by_uri) + + if name is not None: + + def by_name(item: CipherDetails) -> bool: + return name in item.Name + + selectors.append(by_name) + + if organisations is not None: + + def by_organisation(item: CipherDetails) -> bool: + return item.OrganizationId in [o.Id for o in organisations] + + selectors.append(by_organisation) + + if collections is not None: + + def by_collection(item: CipherDetails) -> bool: + return ( + len( + set(item.CollectionIds) + & set([o.Id for o in collections]) + ) + > 0 + ) + + selectors.append(by_collection) + + if types is not None: + + def by_type(item: CipherDetails) -> bool: + return isinstance(item, tuple(types)) + + selectors.append(by_type) + + def select_func(item: CipherDetails) -> bool: + return all([selector(item) for selector in selectors]) + + return self.select_items(select_func) + + def select_items( + self, select_func: typing.Callable[["CipherDetails"], bool] + ) -> typing.Generator["CipherDetails", None, None]: + assert self._sync + for i in self._sync.Ciphers: + if select_func(i): + yield i + + def create_item( + self, + item: CipherDetails, + organization: Organization, + collections: list[OrganizationCollection], + ) -> "CipherDetails": + if organization: + assert organization and ( + collections is not None and len(collections) + ), (organization, collections) + path = "api/ciphers/admin" + key = organization.key() + item.OrganizationId = organization.Id + data = { + "type": item.Type, + "cipher": item.model_dump( + mode="json", + by_alias=True, + context=CryptoContext(client=self, stack=[key]), + exclude_none=True, + ), + "collectionIds": [str(i.Id) for i in collections], + } + else: + path = "api/ciphers" + assert self.connect_token is not None + key = self.connect_token.Key + data = item.model_dump( + mode="json", + by_alias=True, + context=CryptoContext(client=self, stack=[key]), + ) + + resp = self._api_request("POST", path, json=data) + return CipherDetail.validate_json( + resp.text, context=CryptoContext(client=self) + ) + + def edit_item(self, item: CipherDetails) -> "CipherDetails": + assert self.connect_token is not None + path = f"/api/ciphers/{item.Id}" + key = ( + self.connect_token.Key + if item.OrganizationId is None + else get_organization(self, item.OrganizationId).key() + ) + data = item.model_dump( + mode="json", + by_alias=True, + context=CryptoContext(client=self, stack=[key]), + ) + resp = self._api_request("PUT", path, json=data) + return CipherDetail.validate_json( + resp.text, context=CryptoContext(client=self) + ) diff --git a/src/vaultwarden/clients/vaultwarden.py b/src/vaultwarden/clients/vaultwarden.py index c5d580c..85e17c9 100644 --- a/src/vaultwarden/clients/vaultwarden.py +++ b/src/vaultwarden/clients/vaultwarden.py @@ -43,6 +43,9 @@ def __init__( if preload_users: self._load_users() + def close(self): + self._http_client.close() + def _get_admin_cookie(self) -> Cookie | None: """Get the session cookie, required to authenticate requests""" bw_cookies = ( @@ -298,3 +301,18 @@ def transfer_account_rights( permissions=user_details.Permissions, ) self.set_user_enabled(str(user.Id), enabled=False) + + # org management + def delete_organization(self, identifier: str | UUID) -> bool: + logger.info(f"Deleting {identifier} organization") + try: + self._admin_request( + "POST", + f"organizations/{identifier}/delete", + headers={"Content-Type": "application/json"}, + ) + except HTTPStatusError as e: + logger.warning(f"Failed to delete {identifier} {e}") + return False + logger.info(f"Successfully deleted org: {identifier}") + return True diff --git a/src/vaultwarden/models/bitwarden.py b/src/vaultwarden/models/bitwarden.py index 8e81b68..1b45cd1 100644 --- a/src/vaultwarden/models/bitwarden.py +++ b/src/vaultwarden/models/bitwarden.py @@ -1,6 +1,12 @@ -import dataclasses +import base64 import datetime +from enum import IntEnum +from functools import cached_property +import io +from pathlib import Path +from secrets import token_bytes import sys +import typing from typing import ( TYPE_CHECKING, Annotated, @@ -13,32 +19,50 @@ ) from uuid import UUID +from Crypto.PublicKey import RSA from pydantic import ( AliasChoices, + ConfigDict, Field, ModelWrapValidatorHandler, + PrivateAttr, TypeAdapter, - WrapValidator, + computed_field, field_validator, + model_serializer, model_validator, ) from pydantic_core.core_schema import ( - FieldValidationInfo, + SerializationInfo, + SerializerFunctionWrapHandler, ValidationInfo, - ValidatorFunctionWrapHandler, ) -from typing_extensions import Self -from vaultwarden.clients.bitwarden import BitwardenAPIClient +from vaultwarden.models.crypto import ( + CryptoContext, + RSAPublicKey, + SecretBytes, + SecretKey, + SecretOrganizationKey, + SecretString, +) from vaultwarden.models.enum import CipherType, KdfType, OrganizationUserType from vaultwarden.models.exception_models import BitwardenError from vaultwarden.models.permissive_model import PermissiveBaseModel -from vaultwarden.utils.crypto import decrypt, encrypt +from vaultwarden.utils.crypto import ( + AsymmetricCipher, + BinarySymmetricCipher, + SymmetricCipher, + make_master_key, + masterPasswordHash, + stretch_key, +) if TYPE_CHECKING: - import vaultwarden.clients.bitwarden + from vaultwarden.clients.bitwarden import BitwardenAPIClient + from vaultwarden.models.sync import ProfileOrganization -if sys.version_info < (3, 12): +if sys.version_info < (3, 11): from typing_extensions import Self else: from typing import Self @@ -49,208 +73,265 @@ T = TypeVar("T", bound="BitwardenBaseModel") +def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Any], + info: ValidationInfo, +) -> Any: + key: str + ctx: CryptoContext = cast(CryptoContext, info.context) + if (key := (data.get("key") or data.get("Key"))) is not None: + match int(key[0]): + case SymmetricCipher.TYPE: + assert isinstance(ctx.stack[-1], bytes) + v = SymmetricCipher.decode(key, ctx.stack[-1]) + case AsymmetricCipher.TYPE: + assert isinstance(ctx.stack[-1], RSA.RsaKey) + v = AsymmetricCipher.decode(key, ctx.stack[-1]) + ctx.push(v) + + r = handler(data) + + if key is not None: + ctx.pop() + + return r + + +def ser_set_key( + slf: Any, handler: SerializerFunctionWrapHandler, info: SerializationInfo +) -> Any: + key: bytes | None + if (key := slf.Key) is not None: + ctx: CryptoContext = cast(CryptoContext, info.context) + ctx.push(key) + + v = handler(slf) + + if key is not None: + ctx.pop() + + return v + + class ResplistBitwarden(PermissiveBaseModel, Generic[T]): Data: list[T] class BitwardenBaseModel(PermissiveBaseModel): - bitwarden_client: BitwardenAPIClient | None = Field( - default=None, validate_default=True, exclude=True - ) + _bitwarden_client: Any = PrivateAttr(default=None) - @field_validator("bitwarden_client") + @model_validator(mode="wrap") @classmethod - def set_client(cls, v, info: FieldValidationInfo): - if v is None and info.context is not None: - return info.context.get("client") + def val_set_client( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + ctx: CryptoContext = cast(CryptoContext, info.context) + v = handler(data) + v._bitwarden_client = ctx.client return v @property - def api_client(self) -> BitwardenAPIClient: - assert self.bitwarden_client is not None - return self.bitwarden_client - + def api_client(self) -> "BitwardenAPIClient": + assert self._bitwarden_client is not None + return self._bitwarden_client -def decode_bytes( - value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo -) -> bytes: - context: dict = cast("dict", info.context) - keys: list[bytes] = cast("list[bytes]", context.get("cctx")) - for key in keys[::-1]: - try: - return decrypt(handler(value), key) - except Exception: - continue - raise ValueError("No key found") - -def decode_string( - value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo -) -> str: - return decode_bytes(value, handler, info=info).decode("utf-8") +class UriMatchDetection(IntEnum): + BASEDOMAIN = 0 + HOST = 1 + STARTSWITH = 2 + EXACT = 3 + RE = 4 + NEVER = 5 class UriMatch(BitwardenBaseModel): - class Config: - extra = "forbid" + model_config = ConfigDict(extra="forbid") - match: int | None = None - uri: Annotated[str, WrapValidator(decode_string)] | None = None - uriChecksum: Annotated[str, WrapValidator(decode_string)] | None = None + match: UriMatchDetection | None = None + uri: SecretString | None = None + uriChecksum: SecretString | None = None response: str | None = None + def uri_match(self, name: str) -> bool: + import re + import urllib.parse + + if self.uri is None: + return False + m = self.match if self.match is not None else UriMatchDetection.HOST + match m: + case UriMatchDetection.BASEDOMAIN: + url = urllib.parse.urlparse(name) + if url.hostname is None: + return False + basename = ".".join(url.hostname.split(".")[1:]) + hostname = urllib.parse.urlparse(self.uri).hostname + return hostname == basename + case UriMatchDetection.HOST: + url = urllib.parse.urlparse(self.uri) + hostname = urllib.parse.urlparse(name).hostname + return hostname == url.hostname + case UriMatchDetection.STARTSWITH: + return name.startswith(self.uri) + case UriMatchDetection.EXACT: + return name == self.uri + case UriMatchDetection.RE: + return re.match(self.uri, name) is not None + case UriMatchDetection.NEVER: + return False + class XField(BitwardenBaseModel): - class Config: - extra = "forbid" + model_config = ConfigDict(extra="forbid") - name: Annotated[str, WrapValidator(decode_string)] | None = None - response: Annotated[str, WrapValidator(decode_string)] | None = None + name: SecretString | None = None + response: SecretString | None = None type: int - value: Annotated[str, WrapValidator(decode_string)] | None = None + value: SecretString | None = None linkedId: str | None = None -class CipherLogin(BitwardenBaseModel): - class Config: - extra = "forbid" - - name: Annotated[str, WrapValidator(decode_string)] | None = None - autofillOnPageLoad: bool | None = None - password: Annotated[str, WrapValidator(decode_string)] | None = None - passwordRevisionDate: datetime.datetime | None = None - totp: str | None = None - uri: Annotated[str, WrapValidator(decode_string)] | None = None - uris: list[UriMatch] | None = None - username: Annotated[str, WrapValidator(decode_string)] | None = None - notes: Annotated[str, WrapValidator(decode_string)] | None = None - - class PasswordChange(BitwardenBaseModel): - class Config: - extra = "forbid" + model_config = ConfigDict(extra="forbid") lastUsedDate: datetime.datetime - password: str + password: SecretString class Fido2Credential(BitwardenBaseModel): - class Config: - extra = "forbid" + model_config = ConfigDict(extra="forbid") - counter: Annotated[str, WrapValidator(decode_string)] | None = None + counter: SecretString | None = None creationDate: datetime.datetime | None = None - credentialId: Annotated[str, WrapValidator(decode_string)] | None = None - discoverable: Annotated[str, WrapValidator(decode_string)] | None = None - keyAlgorithm: Annotated[str, WrapValidator(decode_string)] | None = None - keyCurve: Annotated[str, WrapValidator(decode_string)] | None = None - keyType: Annotated[str, WrapValidator(decode_string)] | None = None - keyValue: Annotated[str, WrapValidator(decode_string)] | None = None - response: str | None = None - rpId: Annotated[str, WrapValidator(decode_string)] | None = None - rpName: Annotated[str, WrapValidator(decode_string)] | None = None - userDisplayName: Annotated[str, WrapValidator(decode_string)] | None = None - userHandle: Annotated[str, WrapValidator(decode_string)] | None = None - userName: Annotated[str, WrapValidator(decode_string)] | None = None - - -class LoginData(CipherLogin): - class Config: - extra = "forbid" - - fields: list[XField] | None = None - passwordHistory: list[PasswordChange] | None = None + credentialId: SecretString | None = None + discoverable: SecretString | None = None + keyAlgorithm: SecretString | None = None + keyCurve: SecretString | None = None + keyType: SecretString | None = None + keyValue: SecretString | None = None response: str | None = None - fido2Credentials: list[Fido2Credential] | None = None - - -class SecureNoteData(CipherLogin): - class Config: - extra = "forbid" + rpId: SecretString | None = None + rpName: SecretString | None = None + userDisplayName: SecretString | None = None + userHandle: SecretString | None = None + userName: SecretString | None = None - fields: list[XField] - passwordHistory: list[PasswordChange] - response: str | None = None - type: int | None = None +class AttachmentRequest(BitwardenBaseModel): + model_config = ConfigDict(extra="forbid") -class SecureNoteProperty(BitwardenBaseModel): - class Config: - extra = "forbid" - - name: Annotated[str, WrapValidator(decode_string)] | None = None - notes: Annotated[str, WrapValidator(decode_string)] | None = None - fields: list[XField] | None = None - passwordHistory: list[PasswordChange] | None = None - response: Annotated[str, WrapValidator(decode_string)] | None = None - type: int + Key: SecretBytes + fileName: SecretString + fileSize: int + adminRequest: bool | None = None class Attachment(BitwardenBaseModel): - class Config: - extra = "forbid" + model_config = ConfigDict(extra="forbid") - fileName: Annotated[str, WrapValidator(decode_string)] | None = None + Key: SecretBytes + fileName: SecretString | None = None id: str - key: str | None = ( - None # Annotated[str, WrapValidator(decodeBytes)]|None = None - ) - object: str + Object: str size: int sizeName: str url: str + def download(self): + v = self._bitwarden_client._http_client.get(self.url) + return BinarySymmetricCipher.decode(v.content, self.Key) + class _CipherBase(BitwardenBaseModel): - class Config: - extra = "forbid" + model_config = ConfigDict(extra="forbid") Id: UUID | None = None OrganizationId: UUID | None = Field(None, validate_default=True) Type: CipherType - Name: Annotated[str, WrapValidator(decode_string)] + Name: SecretString CollectionIds: list[UUID] - key: str | None = None - - organizationUseTotp: bool | None = None - creationDate: datetime.datetime | None = None - deletedDate: datetime.datetime | None = None - fields: list[XField] | None = None - - notes: Annotated[str, WrapValidator(decode_string)] | None = None - reprompt: int - revisionDate: str - sshKey: str | None - passwordHistory: list[PasswordChange] - object: str | None = None - attachments: list[Attachment] | None = None + Key: SecretKey | None = None + + OrganizationUseTotp: bool | None = None + CreationDate: datetime.datetime | None = None + DeletedDate: datetime.datetime | None = None + Fields: list[XField] | None = None + + Notes: SecretString | None = None + Reprompt: int | None = None + ArchivedDate: str | None = None + RevisionDate: str | None = None + sshKey: str | None = None + Object: str | None = None + Attachments: list[Attachment] | None = None + + Edit: bool | None = None + Favorite: bool | None = None + FolderId: UUID | None = None + Permissions: Any | None = None + PasswordHistory: list[PasswordChange] | None = None + ViewPassword: bool | None = None + + Login: None = None + SecureNote: None = None + Card: None = None + Identity: None = None + + Data: Any | None = None @model_validator(mode="wrap") @classmethod - def set_key( + def val_set_key( cls, data: Any, handler: ModelWrapValidatorHandler[Self], info: ValidationInfo, ) -> Self: - if (key := data.get("key")) is not None: - context = cast("dict", info.context) - cctx = cast("list[bytes]", context.get("cctx")) - - cctx.append(decrypt(key, cctx[0])) + assert isinstance(info.context, CryptoContext) + + ctx: CryptoContext = cast(CryptoContext, info.context) + + assert ctx.client._sync and ctx.client._sync.Profile + + if ( + o := data.get("organizationId") or data.get("OrganizationId") + ) is not None: + oid = UUID(o) + org: typing.Optional["ProfileOrganization"] = None + for org in ctx.client._sync.Profile.Organizations: + if oid == org.Id: + assert org.Key + ctx.push(org.Key) + break + else: + raise ValueError(f"No organization found {oid}") + else: + assert ctx.client._connect_token + ctx.push(ctx.client._connect_token.Key) + r = val_set_key(cls, data, handler, info) - v = handler(data) + ctx.pop() - if key is not None: - cctx.pop() + return r - return v + @model_serializer(mode="wrap") + def ser_set_key( + self, handler: SerializerFunctionWrapHandler, info: SerializationInfo + ) -> Any: + return ser_set_key(self, handler, info) @field_validator("OrganizationId") @classmethod - def set_id(cls, v, info: FieldValidationInfo): + def set_id(cls, v, info: ValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v def add_collections(self, collections: list[UUID]): @@ -277,6 +358,22 @@ def remove_collections(self, collections: list[UUID]): json={"collectionIds": dump}, ) + def collections(self): + org: Organization | None = ( + get_organization(self._bitwarden_client, self.OrganizationId) + if self.OrganizationId + else None + ) + if org is None: + return [] + cd: dict[UUID, OrganizationCollection] = { + o.Id: o for o in org.collections() + } + colls: list[OrganizationCollection] = [ + cd[i] for i in self.CollectionIds + ] + return colls + def delete(self): return self.api_client.api_request("DELETE", f"api/ciphers/{self.Id}") @@ -289,55 +386,166 @@ def update_collection(self, collections: list[UUID]): json={"collectionIds": dump}, ) + def attach(self, path: Path): + with path.open("rb") as f: + self._attach(path.name, f) + + def _attach(self, name: str, file: io.IOBase): + "/api/ciphers/fc246fe5-9177-455b-b318-c00fab407dc8/attachment/v2" + key = token_bytes(64) + ed = BinarySymmetricCipher.encode(file.read(), key) + ar = AttachmentRequest.model_construct( + Key=key, fileName=name, fileSize=len(ed), adminRequest=True + ) + if self.Key: + stack = [self.Key] + elif self.OrganizationId: + stack = [ + get_organization( + self._bitwarden_client, self.OrganizationId + ).key() + ] + else: + stack = [self._bitwarden_client._connect_token._masterKey] + ard = ar.model_dump( + mode="json", + context=CryptoContext(client=self._bitwarden_client, stack=stack), + ) + v = self._bitwarden_client._api_request( + "POST", f"api/ciphers/{self.Id}/attachment/v2", json=ard + ).json() + self._bitwarden_client._api_request( + "POST", + "api" + v["url"], + files={ + "data": ( + ard["fileName"], + io.BytesIO(ed), + "application/octet-stream", + ) + }, + ) + + def uri_match(self, name: str) -> bool: + return False + + def save(self): + self._bitwarden_client.edit_item(self) + + +class LoginData(BitwardenBaseModel): + username: SecretString | None = None + password: SecretString | None = None + passwordRevisionDate: datetime.datetime | None = None + Uri: SecretString | None = None + Uris: list[UriMatch] | None = None + PasswordHistory: list[PasswordChange] | None = None + response: str | None = None + fido2Credentials: list[Fido2Credential] | None = None + + autofillOnPageLoad: bool | None = None + totp: SecretString | None = None + + def uri_match(self, name: str) -> bool: + if self.Uri and self.Uri == name: + return True + + if self.Uris: + for um in self.Uris: + if um.uri_match(name): + return True + return False + class Login(_CipherBase): - Type: Literal[CipherType.Login] + Type: Literal[CipherType.Login] = CipherType.Login + + Login: LoginData | None = None # type: ignore - login: LoginData | None = None - secureNote: None = None - card: None = None - identity: None = None + def uri_match(self, name: str) -> bool: + if self.Login: + return self.Login.uri_match(name) + return False - data: LoginData | None = None + +class SecureNoteData(BitwardenBaseModel): + Fields: list[XField] | None = None + + Notes: SecretString | None = None + + response: str | None = None + type: int | None = None class SecureNote(_CipherBase): - Type: Literal[CipherType.SecureNote] + Type: Literal[CipherType.SecureNote] = CipherType.SecureNote + SecureNote: SecureNoteData | None = None # type: ignore + - login: None = None - secureNote: SecureNoteProperty | None = None - card: None = None - identity: None = None +class CardData(BitwardenBaseModel): + Fields: list[XField] | None = None - data: SecureNoteData | None = None + cardholderName: SecretString | None = None + brand: SecretString | None = None + code: SecretString | None = None + expMonth: SecretString | None = None + expYear: SecretString | None = None + number: SecretString | None = None class Card(_CipherBase): - Type: Literal[CipherType.Card] + Type: Literal[CipherType.Card] = CipherType.Card + Card: CardData | None = None # type: ignore - login: None = None - card: None = None - secureNote: None = None - identity: None = None - data: None = None +class IdentityData(BitwardenBaseModel): + Fields: list[XField] | None = None + + title: SecretString | None = None + firstName: SecretString | None = None + middleName: SecretString | None = None + lastName: SecretString | None = None + username: SecretString | None = None + company: SecretString | None = None + + ssn: SecretString | None = None + passportNumber: SecretString | None = None + licenseNumber: SecretString | None = None + + email: SecretString | None = None + phone: SecretString | None = None + address1: SecretString | None = None + address2: SecretString | None = None + address3: SecretString | None = None + city: SecretString | None = None + state: SecretString | None = None + postalCode: SecretString | None = None + country: SecretString | None = None class Identity(_CipherBase): - Type: Literal[CipherType.Identity] + Type: Literal[CipherType.Identity] = CipherType.Identity + Identity: IdentityData = None # type: ignore + + +class SSHKeyData(BitwardenBaseModel): + keyFingerprint: SecretString | None = None + privateKey: SecretString | None = None + publicKey: SecretString | None = None - login: None = None - secureNote: None = None - card: None = None - identity: None = None - data: None = None +class SSHKey(_CipherBase): + Type: Literal[CipherType.SSHKey] = CipherType.SSHKey + sshKey: SSHKeyData = None # type: ignore CipherDetails = Annotated[ - Union[Login, SecureNote, Card, Identity], Field(discriminator="Type") + Union[Login, SecureNote, Card, Identity, SSHKey], + Field(discriminator="Type"), ] +CipherDetail: TypeAdapter[CipherDetails] = TypeAdapter(CipherDetails) + class CollectionAccess(BitwardenBaseModel): ReadOnly: bool = False @@ -355,9 +563,10 @@ class CollectionUser(CollectionAccess): @field_validator("CollectionId") @classmethod - def set_id(cls, v, info: FieldValidationInfo): + def set_id(cls, v, info: ValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v @@ -371,23 +580,25 @@ class UserCollection(CollectionAccess): @field_validator("UserId") @classmethod - def set_id(cls, v, info: FieldValidationInfo): + def set_id(cls, v, info: ValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v class OrganizationCollection(BitwardenBaseModel): Id: UUID | None = None OrganizationId: UUID | None = Field(None, validate_default=True) - Name: str + Name: SecretString ExternalId: str | None = None @field_validator("OrganizationId") @classmethod - def set_id(cls, v, info: FieldValidationInfo): + def set_id(cls, v, info: ValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v def users(self) -> list[CollectionUser]: @@ -398,7 +609,7 @@ def users(self) -> list[CollectionUser]: ) return TypeAdapter(list[CollectionUser]).validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context=CryptoContext(client=self.api_client, parent_id=self.Id), ) def set_users( @@ -413,9 +624,7 @@ def set_users( if isinstance(users[0], CollectionUser): users = cast("list[CollectionUser]", users) users_payload = [ - user.model_dump( - exclude={"CollectionId"}, by_alias=True, mode="json" - ) + user.model_dump(exclude={"CollectionId"}, mode="json") for user in users ] else: @@ -443,6 +652,38 @@ def delete(self): ) +class UserPublicKey(BitwardenBaseModel): + """ + c.f. https://github.com/dani-garcia/vaultwarden/blob/d6a3d539ed13352085ca7dfa63c49017d86c419b/src/api/core/accounts.rs#L471 + + """ + + userId: UUID + publicKey: RSAPublicKey + object: Literal["userKey"] + + +class ConfirmData(BitwardenBaseModel): + Id: UUID | None = None + Key: SecretOrganizationKey | None + + @model_validator(mode="wrap") + @classmethod + def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + return val_set_key(cls, data, handler, info) + + @model_serializer(mode="wrap") + def ser_set_key( + self, handler: SerializerFunctionWrapHandler, info: SerializationInfo + ) -> Any: + return ser_set_key(self, handler, info) + + class OrganizationUserDetails(BitwardenBaseModel): Id: UUID | None = None Email: str @@ -460,9 +701,10 @@ class OrganizationUserDetails(BitwardenBaseModel): @field_validator("OrganizationId") @classmethod - def set_id(cls, v, info: FieldValidationInfo): + def set_id(cls, v, info: ValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v def add_collections(self, collections: list[UUID]): @@ -470,14 +712,14 @@ def add_collections(self, collections: list[UUID]): for collection in collections: if collection in _current_collections: continue - user = UserCollection( + user = UserCollection.model_construct( CollectionId=collection, UserId=self.Id, ReadOnly=False, HidePasswords=False, Manage=False, ) - user.bitwarden_client = self.api_client + user._bitwarden_client = self.api_client self.Collections.append(user) pl = self.model_dump( include={ @@ -495,7 +737,6 @@ def add_collections(self, collections: list[UUID]): exclude={ "Permissions": self.Permissions is None, }, - by_alias=True, mode="json", ) return ( @@ -530,7 +771,6 @@ def remove_collections(self, collections: list[UUID]): exclude={ "Permissions": self.Permissions is None, }, - by_alias=True, mode="json", ) return self.api_client.api_request( @@ -568,11 +808,22 @@ def update_collection(self, collections: list[UUID]): exclude={ "Permissions": self.Permissions is None, }, - by_alias=True, mode="json", ), ) + def publicKey(self) -> RSA.RsaKey: # noqa: N802 + """ + c.f. https://github.com/dani-garcia/vaultwarden/blob/d6a3d539ed13352085ca7dfa63c49017d86c419b/src/api/core/accounts.rs#L471 + :return: + """ + resp = self.api_client.api_request( + "GET", f"api/users/{self.UserId}/public-key" + ) + return UserPublicKey.model_validate_json( + resp.text, context=CryptoContext(client=self.api_client) + ).publicKey + def delete(self): return self.api_client.api_request( "DELETE", @@ -596,9 +847,10 @@ class Organization(BitwardenBaseModel): @field_validator("Id") @classmethod - def set_id(cls, v, info: FieldValidationInfo): + def set_id(cls, v, info: ValidationInfo): if v is None and info.context is not None: - return info.context.get("parent_id") + ctx: CryptoContext = cast(CryptoContext, info.context) + return ctx.parent_id return v def rename(self, new_name: str): @@ -638,7 +890,6 @@ def invite( ex: dict[str, Literal[True]] = {"UserId": True} collections_payload.append( coll.model_dump( - by_alias=True, mode="json", exclude=ex, ) @@ -674,6 +925,27 @@ def invite( self._users = self._get_users() return resp + def confirm(self, user: OrganizationUserDetails): + """ + c.f. https://github.com/dani-garcia/vaultwarden/blob/d6a3d539ed13352085ca7dfa63c49017d86c419b/src/api/core/organizations.rs#L1382 + :param new_user: + :return: + """ + + publicKey = user.publicKey() # noqa: N806 + + confirm = ConfirmData.model_construct(Key=self.key()) + payload = confirm.model_dump( + mode="json", + context=CryptoContext(client=self.api_client, stack=[publicKey]), + ) + resp = self.api_client.api_request( + "POST", + f"api/organizations/{self.Id}/users/{user.Id}/confirm", + json=payload, + ) + return resp + def _get_users(self) -> list[OrganizationUserDetails]: resp = self.api_client.api_request( "GET", @@ -684,10 +956,9 @@ def _get_users(self) -> list[OrganizationUserDetails]: ResplistBitwarden[OrganizationUserDetails] .model_validate_json( resp.text, - context={ - "parent_id": self.Id, - "client": self.api_client, - }, + context=CryptoContext( + client=self.api_client, parent_id=self.Id + ), ) .Data ) @@ -720,7 +991,7 @@ def user(self, user_id: UUID) -> OrganizationUserDetails: ) return OrganizationUserDetails.model_validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context=CryptoContext(client=self.api_client, parent_id=self.Id), ) def user_search( @@ -740,12 +1011,10 @@ def _get_collections(self) -> list[OrganizationCollection]: ) res = ResplistBitwarden[OrganizationCollection].model_validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context=CryptoContext( + client=self.api_client, parent_id=self.Id, stack=[self.key()] + ), ) - org_key = self.key() - # map each collection name to the decrypted name - for collection in res.Data: - collection.Name = decrypt(collection.Name, org_key).decode("utf-8") return res.Data def collections( @@ -760,7 +1029,9 @@ def collections( def create_collection(self, name: str) -> OrganizationCollection: org_key = self.key() data = { - "name": encrypt(2, name, self.key()), + "name": SymmetricCipher.encode( + name.encode("utf-8"), org_key + ).decode("utf-8"), "groups": [], "users": [], } @@ -769,9 +1040,10 @@ def create_collection(self, name: str) -> OrganizationCollection: ) res = OrganizationCollection.model_validate_json( resp.text, - context={"parent_id": self.Id, "client": self.api_client}, + context=CryptoContext( + client=self.api_client, parent_id=self.Id, stack=[org_key] + ), ) - res.Name = decrypt(res.Name, org_key).decode("utf-8") if self._collections is not None: self._collections.append(res) else: @@ -801,14 +1073,9 @@ def _get_ciphers(self) -> list[CipherDetails]: "api/ciphers/organization-details", params={"organizationId": self.Id}, ) - org_key = self.key() res = ResplistBitwarden[CipherDetails].model_validate_json( resp.text, - context={ - "parent_id": self.Id, - "client": self.api_client, - "cctx": [org_key], # crypto context - }, + context=CryptoContext(client=self.api_client, parent_id=self.Id), ) return res.Data @@ -831,42 +1098,176 @@ def ciphers( ] return self._ciphers - def key(self): - sync = self.api_client.sync() - for org in sync.Profile.Organizations: - if org.Id == self.Id: - break + def key(self) -> bytes: + for force_refresh in [False, True]: + sync = self.api_client.sync(force_refresh=force_refresh) + for org in sync.Profile.Organizations: + if org.Id == self.Id: + assert org and org.Key + return org.Key else: raise BitwardenError(f"No Organizations `{self.Id}` found") - return decrypt(org.Key, self.api_client.connect_token.orgs_key) + + def delete(self) -> None: + self.api_client.api_request( + "DELETE", + f"api/organizations/{self.Id}", + json=dict( + masterPasswordHash=self._bitwarden_client.masterPasswordHash + ), + ) def get_organization( - bitwarden_client, organisation_id: UUID | str + bitwarden_client: "BitwardenAPIClient", organisation_id: UUID | str ) -> Organization: + oid = ( + UUID(organisation_id) + if isinstance(organisation_id, str) + else organisation_id + ) + + if bitwarden_client._sync is not None: + for org in bitwarden_client._sync.Profile.Organizations: + if org.Id == oid: + r = Organization.model_construct( + Id=org.Id, Name=org.Name, BillingEmail="", Object="" + ) + r._bitwarden_client = bitwarden_client + return r + resp = bitwarden_client.api_request( "GET", f"api/organizations/{organisation_id}" ) return Organization.model_validate_json( resp.text, - context={"client": bitwarden_client, "parent_id": organisation_id}, + context=CryptoContext(client=bitwarden_client, parent_id=oid), ) -@dataclasses.dataclass -class Kdf: - Kdf: KdfType +class Kdf(PermissiveBaseModel): + Kdf: int KdfIterations: int | None = None KdfMemory: int | None = None KdfParallelism: int | None = None @classmethod - def from_connect_token( - cls, token: "vaultwarden.clients.bitwarden.ConnectToken" - ): - return cls( - token.Kdf, - token.KdfIterations, - token.KdfMemory, - token.KdfParallelism, + def argon2id(cls): + return cls.model_construct( + Kdf=KdfType.Argon2id, + KdfMemory=32, + KdfIterations=6, + KdfParallelism=4, + ) + + +class KeysData(BitwardenBaseModel): + encryptedPrivateKey: str + publicKey: str + + +class RegisterData(BitwardenBaseModel): + """ + c.f. https://bitwarden.com/help/bitwarden-security-white-paper/ + """ + + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + email: str + password: str = Field(exclude=True) + + name: str + Kdf: int + # key: str + + KdfIterations: int | None = None + KdfMemory: int | None = None + KdfParallelism: int | None = None + + # keys: KeysData | None = None + + masterPasswordHint: str | None = None + + @computed_field # type: ignore[prop-decorator] + @property + def masterPasswordHash(self) -> str: # noqa: N802 + return masterPasswordHash(self._masterKey, self.password) + + @computed_field # type: ignore[prop-decorator] + @property + def key(self) -> str: + return SymmetricCipher.encode(self._rawKey, self._masterKey).decode() + + @computed_field # type: ignore[prop-decorator] + @property + def keys(self) -> KeysData: + return KeysData.model_construct( + encryptedPrivateKey=SymmetricCipher.encode( + self._rawKeys.exportKey("DER", pkcs=8), self._rawKey + ).decode(), + publicKey=base64.b64encode( + self._rawKeys.publickey().exportKey("DER") + ).decode(), + ) + + @cached_property + def _masterKey(self) -> bytes: # noqa: N802 + return make_master_key( + self.password, + self.email, + Kdf.model_construct( + Kdf=self.Kdf, + KdfIterations=self.KdfIterations, + KdfMemory=self.KdfMemory, + KdfParallelism=self.KdfParallelism, + ), + ) + + @cached_property + def _stretchedKey(self) -> bytes: # noqa: N802 + return stretch_key(self._masterKey) + + @cached_property + def _rawKey(self) -> bytes: # noqa: N802 + return token_bytes(64) + + @cached_property + def _rawKeys(self) -> RSA.RsaKey: # noqa: N802 + return RSA.generate(2048) + + +class OrgData(BitwardenBaseModel): + """ + c.f. https://github.com/dani-garcia/vaultwarden/blob/d6a3d539ed13352085ca7dfa63c49017d86c419b/src/api/core/organizations.rs#L109-L119 + """ + + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + BillingEmail: str + CollectionName: SecretString + Key: SecretOrganizationKey + Name: str + # Keys: KeysData + PlanType: int | str + + @computed_field(alias="keys") # type: ignore[prop-decorator] + @property + def Keys(self) -> KeysData: # noqa: N802 + return KeysData.model_construct( + encryptedPrivateKey=SymmetricCipher.encode( + self._rawKeys.exportKey("DER", pkcs=8), self.Key + ).decode(), + publicKey=base64.b64encode( + self._rawKeys.publickey().exportKey("DER") + ).decode(), ) + + @cached_property + def _rawKeys(self) -> RSA.RsaKey: # noqa: N802 + return RSA.generate(2048) + + @model_serializer(mode="wrap") + def ser_set_key( + self, handler: SerializerFunctionWrapHandler, info: SerializationInfo + ) -> Any: + return ser_set_key(self, handler, info) diff --git a/src/vaultwarden/models/crypto.py b/src/vaultwarden/models/crypto.py new file mode 100644 index 0000000..ea797de --- /dev/null +++ b/src/vaultwarden/models/crypto.py @@ -0,0 +1,178 @@ +from base64 import b64decode +import dataclasses +import typing +from typing import TypeAlias, cast +from uuid import UUID + +from Crypto.PublicKey import RSA +from pydantic import ( + SerializationInfo, + SerializerFunctionWrapHandler, + ValidationInfo, + ValidatorFunctionWrapHandler, + WrapSerializer, + WrapValidator, +) +from typing_extensions import Annotated + +from vaultwarden.utils.crypto import AsymmetricCipher, SymmetricCipher + +if typing.TYPE_CHECKING: + from vaultwarden.clients.bitwarden import BitwardenAPIClient + + +def decode_string( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> str: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.decode(value, ctx.stack[-1])) + + +def encode_string( + value: str, handler: SerializerFunctionWrapHandler, info: SerializationInfo +) -> str: + assert info.mode == "json" + ctx = cast(CryptoContext, info.context) + return handler( + SymmetricCipher.encode(value.encode(), ctx.stack[-1]).decode() + ) + + +SecretString = Annotated[ + str, WrapValidator(decode_string), WrapSerializer(encode_string) +] +""" +Symmetric encoded string value +""" + + +def decode_bytes( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.decode(value, ctx.stack[-1])) + + +def encode_bytes( + value: bytes, + handler: SerializerFunctionWrapHandler, + info: SerializationInfo, +) -> str: + assert info.mode == "json" + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.encode(value, ctx.stack[-1])) + + +SecretBytes = Annotated[ + bytes, WrapValidator(decode_bytes), WrapSerializer(encode_bytes) +] +""" +Symmetric encoded bytes value +""" + + +def decode_rsa( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> RSA.RsaKey: + ctx = cast(CryptoContext, info.context) + return handler(RSA.importKey(SymmetricCipher.decode(value, ctx.stack[-1]))) + + +def encode_rsa( + value: RSA.RsaKey, + handler: SerializerFunctionWrapHandler, + info: SerializationInfo, +) -> str: + assert info.mode == "json" + ctx = cast(CryptoContext, info.context) + return handler( + SymmetricCipher.encode(value.exportKey("DER", pkcs=8), ctx.stack[-1]) + ) + + +SecretRSA = Annotated[ + RSA.RsaKey, WrapValidator(decode_rsa), WrapSerializer(encode_rsa) +] +""" +Symmetric encoded RSA key +""" + + +def decode_org_key( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler(AsymmetricCipher.decode(value, ctx.stack[-2])) + + +def encode_org_key( + value: bytes, + handler: SerializerFunctionWrapHandler, + info: SerializationInfo, +) -> str: + assert info.mode == "json" + ctx = cast(CryptoContext, info.context) + return handler(AsymmetricCipher.encode(value, ctx.stack[-2])) + + +SecretOrganizationKey = Annotated[ + bytes, WrapValidator(decode_org_key), WrapSerializer(encode_org_key) +] +""" +Asymmetric encoded Key + +* key is not added to cctx +* encoding uses the seconds last key in cctx +""" + + +def decode_key( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> bytes: + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.decode(value, ctx.stack[-2])) + + +def encode_key( + value: bytes, + handler: SerializerFunctionWrapHandler, + info: SerializationInfo, +) -> str: + assert info.mode == "json" + ctx = cast(CryptoContext, info.context) + return handler(SymmetricCipher.encode(value, ctx.stack[-2])) + + +SecretKey = Annotated[ + bytes, WrapValidator(decode_key), WrapSerializer(encode_key) +] +""" +Symmetric encoded Key + +* the Key is added to cctx by ser_set_key / val_set_key of the model +* en/decoding uses the [-2] key in cctx +""" + +CryptoKey: TypeAlias = RSA.RsaKey | bytes + + +def decode_rsa_public_key( + value: str, handler: ValidatorFunctionWrapHandler, info: ValidationInfo +) -> RSA.RsaKey: + return handler(RSA.importKey(b64decode(value))) + + +RSAPublicKey = Annotated[RSA.RsaKey, WrapValidator(decode_rsa_public_key)] + + +@dataclasses.dataclass +class CryptoContext: + client: "BitwardenAPIClient" + parent_id: UUID | None = None + stack: list[CryptoKey] = dataclasses.field(default_factory=list) + + def push(self, v: CryptoKey) -> None: + return self.stack.append(v) + + def pop(self) -> CryptoKey: + return self.stack.pop() diff --git a/src/vaultwarden/models/enum.py b/src/vaultwarden/models/enum.py index ece782e..e60bfa8 100644 --- a/src/vaultwarden/models/enum.py +++ b/src/vaultwarden/models/enum.py @@ -21,6 +21,7 @@ class CipherType(IntEnum): SecureNote = 2 Card = 3 Identity = 4 + SSHKey = 5 class VaultwardenUserStatus(IntEnum): diff --git a/src/vaultwarden/models/permissive_model.py b/src/vaultwarden/models/permissive_model.py index 9839a0c..642ba6b 100644 --- a/src/vaultwarden/models/permissive_model.py +++ b/src/vaultwarden/models/permissive_model.py @@ -9,5 +9,6 @@ class PermissiveBaseModel( alias_generator=pascal_case_to_camel_case, populate_by_name=True, arbitrary_types_allowed=True, + serialize_by_alias=True, ): pass diff --git a/src/vaultwarden/models/sync.py b/src/vaultwarden/models/sync.py index 3044ce2..9b35fb8 100644 --- a/src/vaultwarden/models/sync.py +++ b/src/vaultwarden/models/sync.py @@ -1,11 +1,32 @@ +import sys import time +from typing import Any, cast from uuid import UUID -from pydantic import AliasChoices, Field, field_validator - +from pydantic import ( + AliasChoices, + Field, + ModelWrapValidatorHandler, + PrivateAttr, + ValidationInfo, + field_validator, + model_validator, +) + +from vaultwarden.models.bitwarden import CipherDetails, val_set_key +from vaultwarden.models.crypto import ( + CryptoContext, + SecretKey, + SecretOrganizationKey, + SecretRSA, +) from vaultwarden.models.enum import KdfType, VaultwardenUserStatus from vaultwarden.models.permissive_model import PermissiveBaseModel -from vaultwarden.utils.crypto import decrypt + +if sys.version_info < (3, 11): + from typing_extensions import Self +else: + from typing import Self class ConnectToken(PermissiveBaseModel): @@ -13,8 +34,8 @@ class ConnectToken(PermissiveBaseModel): KdfIterations: int = 0 KdfMemory: int | None = None KdfParallelism: int | None = None - Key: str - PrivateKey: str + Key: SecretKey + PrivateKey: SecretRSA access_token: str refresh_token: str | None = None expires_in: int @@ -23,7 +44,7 @@ class ConnectToken(PermissiveBaseModel): unofficialServer: bool = False ResetMasterPassword: bool | None = None - master_key: bytes | None = None # pydantic.PrivateAttr(default=None) + _master_key: bytes | None = PrivateAttr(default=None) @field_validator("expires_in") @classmethod @@ -35,19 +56,39 @@ def is_expired(self, now=None): now = time.time() return (self.expires_in is not None) and (self.expires_in <= now) - @property - def user_key(self): - return decrypt(self.Key, self.master_key) - - @property - def orgs_key(self): - return decrypt(self.PrivateKey, self.user_key) - - -class ProfileOrganization(PermissiveBaseModel): + @model_validator(mode="wrap") + @classmethod + def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + from vaultwarden.models.bitwarden import Kdf + from vaultwarden.models.crypto import CryptoContext + from vaultwarden.utils.crypto import make_master_key + + assert info and info.context + + ctx = cast(CryptoContext, info.context) + assert ctx.client.email is not None + + master_key = make_master_key( + password=ctx.client.password, + salt=ctx.client.email, + kdf=Kdf.model_validate(data), + ) + ctx.push(master_key) + v = val_set_key(cls, data, handler, info) + ctx.pop() # master_key + v._master_key = master_key + return v + + +class _ProfileOrganization(PermissiveBaseModel): Id: UUID Name: str - Key: str | None = None + Key: SecretOrganizationKey | None = None ProviderId: str | None = None ProviderName: str | None = None ResetPasswordEnrolled: bool @@ -67,20 +108,32 @@ class ProfileOrganization(PermissiveBaseModel): UseTotp: bool -class UserProfile(PermissiveBaseModel): +class ProfileOrganization(_ProfileOrganization): + @model_validator(mode="wrap") + @classmethod + def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + return val_set_key(cls, data, handler, info) + + +class _UserProfile(PermissiveBaseModel): AvatarColor: str | None Culture: str Email: str EmailVerified: bool ForcePasswordReset: bool Id: UUID - Key: str + Key: SecretKey MasterPasswordHint: str | None = None Name: str | None Object: str | None + PrivateKey: SecretRSA | None Organizations: list[ProfileOrganization] Premium: bool - PrivateKey: str | None ProviderOrganizations: list Providers: list SecurityStamp: str @@ -92,18 +145,75 @@ class UserProfile(PermissiveBaseModel): ) -class VaultwardenUser(UserProfile): +class UserProfile(_UserProfile): + @field_validator("Organizations", mode="wrap") + @classmethod + def val_field_Organizations( # noqa: N802 + cls, + v: str, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + ctx: CryptoContext = cast(CryptoContext, info.context) + if ( + key := info.data.get("PrivateKey") or info.data.get("privateKey") + ) is not None: + ctx.push(key) + r = handler(v) + if key: + ctx.pop() + return r + + @model_validator(mode="wrap") + @classmethod + def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + return val_set_key(cls, data, handler, info) + + +class VaultwardenOrganization(_ProfileOrganization): + # overwrite + Key: str # type: ignore + + +class VaultwardenUser(_UserProfile): UserEnabled: bool CreatedAt: str LastActive: str | None = None + # overwrite + Key: str # type: ignore + PrivateKey: str | None # type: ignore + Organizations: list[VaultwardenOrganization] # type: ignore + -# TODO: add definition of attribute's types class SyncData(PermissiveBaseModel): - Ciphers: list[dict] + Profile: UserProfile + Ciphers: list[CipherDetails] Collections: list[dict] Domains: dict | None Folders: list[dict] Policies: list[dict] - Profile: UserProfile Sends: list[dict] + + @model_validator(mode="wrap") + @classmethod + def val_set_key( + cls, + data: Any, + handler: ModelWrapValidatorHandler[Self], + info: ValidationInfo, + ) -> Self: + ctx: CryptoContext = cast(CryptoContext, info.context) + + assert ( + ctx.client._connect_token and ctx.client._connect_token._master_key + ) + ctx.push(ctx.client._connect_token._master_key) + r = handler(data) + ctx.pop() + return r diff --git a/src/vaultwarden/utils/crypto.py b/src/vaultwarden/utils/crypto.py index 7b47841..922eaae 100644 --- a/src/vaultwarden/utils/crypto.py +++ b/src/vaultwarden/utils/crypto.py @@ -2,13 +2,13 @@ # -*- coding: utf-8 -*- # Original source: # https://github.com/corpusops/bitwardentools/blob/main/src/bitwardentools/crypto.py -from __future__ import absolute_import, division, print_function import base64 import hashlib import re import secrets import string +import sys from base64 import b64decode, b64encode from enum import IntEnum from hashlib import pbkdf2_hmac, sha256 @@ -20,105 +20,195 @@ from Crypto.PublicKey import RSA from hkdf import hkdf_expand -if typing.TYPE_CHECKING: - import vaultwarden.models.bitwarden - -class CIPHERS(IntEnum): - sym = 2 - asym = 4 - - -CACHE = {} # type: ignore -ITERATIONS = 2000000 -ENCODED_CIPHER = { - CIPHERS.sym: "{typ}.{b64_iv}|{b64_ct}|{b64_digest}", - CIPHERS.asym: "{typ}.{b64_ct}", -} -ENCRYPTED_STRING_RE = re.compile("^[0-9][.].*=.*", flags=re.I | re.M) -SYM_ENCRYPTED_STRING_RE = re.compile( - "^2[.][^=]+=+[|][^=]+=+[|][^=]+=+", flags=re.I | re.M -) - - -class UnimplementedError(Exception): - """.""" - -class DecodeEncKeyError(ValueError): - """.""" - - -class WrongFormatError(DecodeEncKeyError): - """.""" +if sys.version_info < (3, 11): + from typing_extensions import Self +else: + from typing import Self -class WrongTypeDecryptError(DecodeEncKeyError): - """.""" +if typing.TYPE_CHECKING: + import vaultwarden.models.bitwarden -class MissingPartsDecryptError(DecodeEncKeyError): - """.""" +class CIPHERS(IntEnum): + null = 0 + sym = 2 + asym = 4 -class B64DecryptError(DecodeEncKeyError): - """.""" +class _Cipher: + TYPE: int + ENCODING: bytes + @classmethod + def encode(cls, plainbytes:bytes, key:bytes) -> bytes: + raise NotImplementedError() + + @classmethod + def decode(cls, data, key) -> bytes: + raise NotImplementedError() + + def _decrypt(self, data:bytes, key: bytes) -> bytes: + raise NotImplementedError() + +class AsymmetricCipher(_Cipher): + TYPE = CIPHERS.asym + ENCODING = b"%(typ)i.%(ct)b" + @classmethod + def _parse(cls, ct:str) -> tuple[Self, bytes]: + return cls(), b64decode(ct) + + def _decrypt(self, ct:bytes, key: RSA.RsaKey) -> bytes: + assert isinstance(ct, bytes) + assert isinstance(key, RSA.RsaKey) + return PKCS1_OAEP.new(key).decrypt(ct) + + @classmethod + def encode(cls, plainbytes: bytes, key: RSA.RsaKey) -> bytes: + assert isinstance(plainbytes, bytes) + assert isinstance(key, RSA.RsaKey) + cipher = PKCS1_OAEP.new(key).encrypt(plainbytes) + return cls.ENCODING % {b"typ":cls.TYPE, b"ct":b64encode(cipher)} + + @classmethod + def decode(cls, data: str, key: RSA.RsaKey) -> bytes: + assert int(data[0]) == AsymmetricCipher.TYPE + cipher, ct = cls._parse(data[1:]) + return cipher._decrypt(ct, key) + +class SymmetricCipher(_Cipher): + TYPE = CIPHERS.sym + ENCODING = b"%(typ)i.%(iv)b|%(ct)b|%(digest)b" + def __init__(self, iv:bytes, mac:bytes): + self._iv = iv + self._mac = mac + + @classmethod + def _parse(cls, ct: str) -> tuple[Self, bytes]: + iv, ct, mac = ct.split("|", 3) + return cls(b64decode(iv), b64decode(mac)[0:32]), b64decode(ct) + + def _decrypt(self, ct: bytes, key: bytes) -> bytes: + assert isinstance(ct, bytes) + assert isinstance(key, bytes) + enc, mac = SymmetricCipher._get_enc_mac(key) + hdmac = hmac_new(mac, self._iv + ct, sha256).digest() + if hdmac != self._mac: + raise DecryptError( + f"Symmetric hmac verification failed {bytes(hdmac).hex()} / {bytes(self._mac).hex()}. Check your password." + ) + c = AES.new(enc, AES.MODE_CBC, self._iv) + plaintext = c.decrypt(ct) + pad_len = plaintext[-1] + padding = bytes([pad_len] * pad_len) + if plaintext[-pad_len:] == padding: + plaintext = plaintext[:-pad_len] + return plaintext + + + @classmethod + def encode(cls, plainbytes: bytes, key: bytes) -> bytes: + assert isinstance(plainbytes, bytes) + assert isinstance(key, bytes) + # inspired from bitwarden/jslib:src/services/crypto.service.ts + (iv, ct, mac) = aes_encrypt(plainbytes, key) + return cls.ENCODING % {b"typ":cls.TYPE, b"iv":b64encode(iv), b"ct":b64encode(ct), b"digest":b64encode(mac)} + + @classmethod + def decode(cls, data: str, key: bytes) -> bytes: + assert int(data[0]) == SymmetricCipher.TYPE + cipher, ct = cls._parse(data[1:]) + return cipher._decrypt(ct, key) + + + @staticmethod + def _get_enc_mac(key:bytes) -> tuple[bytes, bytes]: + assert isinstance(key, bytes) + # + match len(key): + case 32: + """symmetric master_key of the user""" + enc = hkdf_expand(key, b"enc", 32, sha256) + mac = hkdf_expand(key, b"mac", 32, sha256) + case 64: + """symmetric key of an organization""" + enc = key[:32] + mac = key[32:] + case _: + raise ValueError(f"Invalid key type {key!r}") + return enc, mac + + +class BinarySymmetricCipher: + TYPE = CIPHERS.sym + ENCODING = b"%(typ)c%(iv)16b%(mac)32b%(ct)b" + + def __init__(self, iv:bytes, mac:bytes): + self._iv = iv + self._mac = mac + + @classmethod + def _parse(cls, cipher_bytes: bytes) -> tuple[Self, bytes]: + iv = cipher_bytes[0:16] + mac = cipher_bytes[16:48] + ct = cipher_bytes[48:] + return cls(iv, mac), ct + + def _decrypt(self, ct: bytes, key: bytes) -> bytes: + assert isinstance(ct, bytes) + assert isinstance(key, bytes) + enc, mac = SymmetricCipher._get_enc_mac(key) + hdmac = hmac_new(mac, self._iv + ct, sha256).digest() + if hdmac != self._mac: + raise DecryptError( + f"Symmetric hmac verification failed {bytes(hdmac).hex()} / {bytes(self._mac).hex()}. Check your password." + ) + c = AES.new(enc, AES.MODE_CBC, self._iv) + plaintext = c.decrypt(ct) + pad_len = plaintext[-1] + padding = bytes([pad_len] * pad_len) + if plaintext[-pad_len:] == padding: + plaintext = plaintext[:-pad_len] + return plaintext + + @classmethod + def decode(cls, data: bytes, key: bytes) -> bytes: + assert isinstance(data, bytes) + assert isinstance(key, bytes) + assert int(data[0]) == SymmetricCipher.TYPE + cipher, ct = cls._parse(data[1:]) + return cipher._decrypt(ct, key) + + + @classmethod + def encode(cls, plainbytes: bytes, key: bytes) -> bytes: + assert isinstance(plainbytes, bytes) + assert isinstance(key, bytes) + # inspired from bitwarden/jslib:src/services/crypto.service.ts + (iv, ct, mac) = aes_encrypt(plainbytes, key) + return cls.ENCODING % {b"typ": cls.TYPE, b"iv": iv, b"mac": mac, b"ct": ct} + + + +class NullCipher(_Cipher): + TYPE = CIPHERS.null + def __init__(self, iv, ct): + self._iv = iv + self._ct = ct + + @classmethod + def parse(cls, ct): + iv, ct, mac = ct.split("|", 2) + iv = b64decode(iv) + ct = b64decode(ct) + return cls(iv), ct class DecryptError(ValueError): """.""" -def decode_cipher_string(cipher_string): - """decode a cipher tring into it's parts""" - iv = None - mac = None - assert cipher_string is not None - if not ENCRYPTED_STRING_RE.match(cipher_string): - raise WrongFormatError(f"{cipher_string}") - try: - typ = cipher_string[0:1] - typ = int(typ) - assert typ < 9 - except (AssertionError, ValueError): - raise WrongTypeDecryptError(f"{typ} is not valid") - ct = cipher_string[2:] - if typ == CIPHERS.asym: - pass - else: - try: - if typ == 0: - iv, ct = ct.split("|", 2) - else: - iv, ct, mac = ct.split("|", 3) - except Exception: - raise MissingPartsDecryptError(f"{ct} is missing parts") - if iv: - try: - iv = b64decode(iv) - except Exception: - raise B64DecryptError(f"iv {iv} not valid") - if mac: - try: - mac = b64decode(mac)[0:32] - except Exception: - raise B64DecryptError(f"mac {mac} not valid") - try: - ct = b64decode(ct) - except Exception: - raise B64DecryptError(f"ct {ct} not valid") - return typ, iv, ct, mac - - -def is_encrypted(cipher_string): - try: - decode_cipher_string(cipher_string) - except DecodeEncKeyError: - return False - else: - return True - - -def make_master_key(password: str, salt: str, kdf: "vaultwarden.models.bitwarden.Kdf"): +def make_master_key(password: str, salt: str, kdf: "vaultwarden.models.bitwarden.Kdf") -> bytes: import vaultwarden.models.bitwarden assert isinstance(salt, str) @@ -149,31 +239,16 @@ def make_master_key(password: str, salt: str, kdf: "vaultwarden.models.bitwarden type=argon2.Type.ID, ) return v + case _: + raise ValueError(f"unsupported kdf {kdf}") + + +def aes_encrypt(plaintext: bytes, key: bytes) -> tuple[bytes, bytes, bytes]: + assert isinstance(plaintext, bytes) + assert isinstance(key, bytes) + + enc, mac = SymmetricCipher._get_enc_mac(key) -def hash_password(password, salt, iterations=ITERATIONS): - """base64-encode a wrapped, stretched password+salt(email) for signup/login""" - if not hasattr(password, "decode"): - password = password.encode("utf-8") - master_key = make_master_key(password, salt, iterations) - hashpw = hashlib.pbkdf2_hmac("sha256", master_key, password, 1) - return base64.b64encode(hashpw), master_key - - -def load_rsa_key(key): - rsakeys = CACHE.setdefault("rsa", {}) - if not isinstance(key, RSA.RsaKey): - try: - key = rsakeys[key] - except KeyError: - rsakeys[key] = RSA.importKey(key) - key = rsakeys[key] - return key - - -def aes_encrypt(plaintext, key, charset="utf-8"): - enc, mac = get_sym_enc_mac(key) - if not hasattr(plaintext, "decode"): - plaintext = plaintext.encode(charset) pad_len = 16 - len(plaintext) % 16 padding = bytes([pad_len] * pad_len) content = plaintext + padding @@ -181,106 +256,10 @@ def aes_encrypt(plaintext, key, charset="utf-8"): c = AES.new(enc, AES.MODE_CBC, iv) ct = c.encrypt(content) cmac = hmac_new(mac, iv + ct, sha256) - return iv, ct, cmac - - -def encrypt_sym(plaintext, key, to_bytes=False, *a, **kw): - # inspired from bitwarden/jslib:src/services/crypto.service.ts - typ, (iv, ct, mac) = int(CIPHERS.sym), aes_encrypt(plaintext, key, *a, **kw) - if mac: - mac = mac.digest() - if to_bytes: - # jslib: encryptToBytes() - ret = chr(typ).encode() - ret += iv - if mac: - ret += mac - ret += ct - else: - # jslib: encrypt() - b64_iv = b64encode(iv).decode() - b64_ct = b64encode(ct).decode() - b64_digest = "" - if mac: - b64_digest = b64encode(mac).decode() - ret = ENCODED_CIPHER[typ].format(**locals()) - return ret - - -def encrypt_sym_to_bytes(plaintext, key, *a, **kw): - kw["to_bytes"] = True - return encrypt_sym(plaintext, key, *a, **kw) - - -def encrypt_asym(plaintext, key, *a, **kw): - cipher = PKCS1_OAEP.new(load_rsa_key(key)).encrypt(plaintext) - b64_ct = b64encode(cipher).decode() - typ = CIPHERS.asym - return ENCODED_CIPHER[typ].format(**locals()) - - -def encrypt(typ, plaintext, key, *a, **kw): - try: - enc = ENCRYPT[typ] - except KeyError: - raise UnimplementedError(f"can not encrypt type:{typ}") - return enc(plaintext=plaintext, key=key, *a, **kw) - - -def get_sym_enc_mac(key): - # symmetric master_key of the user - if len(key) == 32: - enc = hkdf_expand(key, b"enc", 32, sha256) - mac = hkdf_expand(key, b"mac", 32, sha256) - # symmetric key of an organization - elif len(key) == 64: - enc = key[:32] - mac = key[32:] - return enc, mac - - -def decrypt_sym(dct, key, div, dmac, *a, **kw): - enc, mac = get_sym_enc_mac(key) - hdmac = hmac_new(mac, div + dct, sha256).digest() - if hdmac != dmac: - raise DecryptError( - f"Symmetric hmac verification failed {bytes(hdmac).hex()} / {bytes(dmac).hex()}. Check your password." - ) - c = AES.new(enc, AES.MODE_CBC, div) - plaintext = c.decrypt(dct) - pad_len = plaintext[-1] - padding = bytes([pad_len] * pad_len) - if plaintext[-pad_len:] == padding: - plaintext = plaintext[:-pad_len] - return plaintext - - -def decrypt_asym(dct, key, *a, **kw): - return PKCS1_OAEP.new(load_rsa_key(key)).decrypt(dct) - - -def decrypt_bytes(cipher_bytes, key, *a, **kw): - ret, typ = None, cipher_bytes[0] - if typ in [2]: - iv = cipher_bytes[1:17] - mac = cipher_bytes[17:49] - ct = cipher_bytes[49:] - ret = decrypt_sym(ct, key, iv, mac) - else: - raise UnimplementedError(f"{typ} encType decryption is not implemented") - return ret - + return iv, ct, cmac.digest() -def decrypt(cipher_string, key, *a, **kw): - typ, iv, ct, mac = decode_cipher_string(cipher_string) - try: - dec = DECRYPT[typ] - except KeyError: - raise UnimplementedError(f"can not decrypt type:{typ}") - return dec(div=iv, dct=ct, dmac=mac, key=key, *a, **kw) - -def strech_key(key): +def stretch_key(key: bytes) -> bytes: stretched_key = key if len(stretched_key) < 64: stretched_key = hkdf_expand(key, b"enc", 32, sha256) + hkdf_expand( @@ -288,24 +267,14 @@ def strech_key(key): ) return stretched_key +def masterPasswordHash(masterKey: bytes, password: str) -> str: + v = hashlib.pbkdf2_hmac( + "sha256", masterKey, password.encode(), 1 + ) + return base64.b64encode(v).decode() -def make_sym_key(master_key): - stretched_key = strech_key(master_key) - plaintext = token_bytes(64) - return encrypt_sym(plaintext, stretched_key), plaintext - - -def make_asym_key(key, stretch=True): - if stretch: - key = strech_key(key) - asym_key = RSA.generate(2048) - public_key = asym_key.publickey().exportKey("DER") - private_key = asym_key.exportKey("DER", pkcs=8) - return encrypt_sym(private_key, key), public_key, private_key - - -def gen_password(length=32, alphabet=None): - alphabet = string.ascii_letters + string.digits +def gen_password(length=32, alphabet=None) -> str: # FIXME UNUSED + alphabet = alphabet or string.ascii_letters + string.digits while True: password = "".join(secrets.choice(alphabet) for i in range(length)) if ( @@ -317,6 +286,4 @@ def gen_password(length=32, alphabet=None): return password -DECRYPT = {CIPHERS.sym: decrypt_sym, CIPHERS.asym: decrypt_asym} -ENCRYPT = {CIPHERS.sym: encrypt_sym, CIPHERS.asym: encrypt_asym} # vim:set et sts=4 ts=4 tw=120: diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py index e69de29..6cd987f 100644 --- a/tests/e2e/__init__.py +++ b/tests/e2e/__init__.py @@ -0,0 +1,12 @@ +def env_from_ci(): + import os + from pathlib import Path + + if os.environ.get("BITWARDEN_URL", None) is not None: + return + + import yaml + + obj = yaml.safe_load(Path(".github/workflows/ci.yml").read_text()) + for k, v in obj["jobs"]["test"]["steps"][-1]["env"].items(): + os.environ[k] = v diff --git a/tests/e2e/test_bitwarden.py b/tests/e2e/test_bitwarden.py index 2a566a0..6151be9 100644 --- a/tests/e2e/test_bitwarden.py +++ b/tests/e2e/test_bitwarden.py @@ -1,8 +1,16 @@ import os +from pathlib import Path +import string import unittest from vaultwarden.clients.bitwarden import BitwardenAPIClient -from vaultwarden.models.bitwarden import get_organization +from vaultwarden.models.bitwarden import ( + get_organization, +) + +from . import env_from_ci + +env_from_ci() # Get Bitwarden credentials from environment variables url = os.environ.get("BITWARDEN_URL", None) @@ -12,6 +20,7 @@ client_secret = os.environ.get("BITWARDEN_CLIENT_SECRET", None) device_id = os.environ.get("BITWARDEN_DEVICE_ID", None) + # Get test organization id from environment variables test_organization = os.environ.get("BITWARDEN_TEST_ORGANIZATION", None) @@ -35,6 +44,8 @@ class BitwardenBaseTests: + bitwarden: BitwardenAPIClient + def setup_base(self): self.organization = get_organization(self.bitwarden, test_organization) self.test_colls_names = self.organization.collections(as_dict=True) @@ -120,7 +131,7 @@ def test_invite_user_than_remove(self): self.assertIsNotNone(user) user.delete() - def test_rename_organization(self): + def _test_rename_organization(self): old_name = self.organization.Name new_name = "new_test_organization" self.organization.rename(new_name) @@ -142,6 +153,82 @@ def test_deduplicate(self): # Todo build test fixtures and delete them at the end of the test return + def _test_create_user(self): + import random + + from vaultwarden.models.bitwarden import Kdf + from vaultwarden.utils.crypto import gen_password + + rnd = "".join( + random.choices(string.ascii_letters + string.digits, k=10) + ).lower() + self.bitwarden.create_user( + f"test+{rnd}@examle.org", + gen_password(), + "test user", + kdf=Kdf.argon2id(), + ) + + def _test_create_org_login(self): + from secrets import token_bytes + + from vaultwarden.models.bitwarden import Login, LoginData + + for name, key in [ + ("org - with key", token_bytes(64)), + ("org - no key", None), + ]: + data = LoginData.model_construct( + name=name, + password="test123", + username="test", + ) + item = Login.model_construct( + name=name, + login=data, + data=data, + key=key, + ) + self.bitwarden.create_item( + item, self.organization, collections=self.test_colls_ids + ) + + def _test_create_own_login(self): + from secrets import token_bytes + + from vaultwarden.models.bitwarden import Login, LoginData + + for name, key in [ + ("own - with key", token_bytes(64)), + ("own - no key", None), + ]: + data = LoginData.model_construct( + name=name, + password="test123", + username="test", + ) + item = Login.model_construct( + name=name, + login=data, + data=data, + key=key, + ) + self.bitwarden.create_item( + item, None, collections=self.test_colls_ids + ) + + def _test_create_attachment(self): + from vaultwarden.models.bitwarden import Login + + login: Login = next( + filter(lambda x: x.attachments, self.test_org_ciphers) + ) + login.attach(Path(__file__)) + + def test_sync(self): + for v in self.bitwarden._sync.Ciphers: + print(f"{v.Name} - {v.OrganizationId}") + class BitwardenWithEmailTests(unittest.TestCase, BitwardenBaseTests): def setUp(self): diff --git a/tests/e2e/test_vaultwarden.py b/tests/e2e/test_vaultwarden.py index ed6981d..c015bb2 100644 --- a/tests/e2e/test_vaultwarden.py +++ b/tests/e2e/test_vaultwarden.py @@ -4,14 +4,20 @@ from vaultwarden.clients.vaultwarden import VaultwardenAdminClient +from . import env_from_ci + +env_from_ci() + # Get Vaultwarden Admin credentials from environment variables url = os.environ.get("VAULTWARDEN_URL", None) admin_token = os.environ.get("VAULTWARDEN_ADMIN_TOKEN", None) -# TODO Add tests for VaultwardenAdminClient class VaultwardenAdminClientBasic(unittest.TestCase): def setUp(self) -> None: self.vaultwarden = VaultwardenAdminClient( - url=url, admin_secret_token=admin_token + url=url, admin_secret_token=admin_token, preload_users=False ) + + def test_users(self): + self.vaultwarden.users() diff --git a/tests/e2e/test_write.py b/tests/e2e/test_write.py new file mode 100644 index 0000000..28fbf4a --- /dev/null +++ b/tests/e2e/test_write.py @@ -0,0 +1,392 @@ +import os +from pathlib import Path +import random +import secrets +from secrets import token_bytes +import string +import urllib.parse + +import pytest +from vaultwarden.clients.bitwarden import BitwardenAPIClient +from vaultwarden.clients.vaultwarden import VaultwardenAdminClient +from vaultwarden.models.bitwarden import ( + Card, + CardData, + CipherDetails, + Identity, + IdentityData, + Kdf, + Login, + LoginData, + Organization, + OrganizationCollection, + SecureNote, + SecureNoteData, + SSHKeyData, + UriMatch, + UriMatchDetection, +) + + +@pytest.fixture +def test_account(): + from . import env_from_ci + + env_from_ci() + + # Get Bitwarden credentials from environment variables + url = os.environ.get("BITWARDEN_URL", None) + email = os.environ.get("BITWARDEN_EMAIL", None) + password = os.environ.get("BITWARDEN_PASSWORD", None) + client_id = os.environ.get("BITWARDEN_CLIENT_ID", None) + client_secret = os.environ.get("BITWARDEN_CLIENT_SECRET", None) + device_id = os.environ.get("BITWARDEN_DEVICE_ID", None) + + # Get test organization id from environment variables + # test_organization = os.environ.get("BITWARDEN_TEST_ORGANIZATION", None) + + c = BitwardenAPIClient( + url, + email, + password, + client_id, + client_secret, + device_id, + ) + + c.sync() + yield c + c.close() + + +@pytest.fixture +def admin(test_account): + admin_secret_token = os.environ.get("VAULTWARDEN_ADMIN_TOKEN", None) + c = VaultwardenAdminClient( + test_account.url, admin_secret_token, preload_users=False + ) + yield c + c.close() + + +@pytest.fixture +def user() -> dict: + u = "".join(random.choices(string.ascii_lowercase + string.digits, k=10)) + email = f"{u}@example.org" + return dict(email=email, password=u, name=u, kdf=Kdf.argon2id()) + + +@pytest.fixture +def organization(test_account: BitwardenAPIClient, user: dict) -> Organization: + return test_account.create_organization( + name=f'Org {user["email"].partition("@")[0]}', email=user["email"] + ) + + +@pytest.fixture +def login() -> "Login": + uri = urllib.parse.urlparse( + url := "http://username:password@login.example.org" + ) + key = secrets.token_bytes(64) + + data = LoginData.model_construct( + name=uri.hostname, + password=uri.username, + username=uri.password, + uris=[UriMatch.model_construct(match=UriMatchDetection.HOST, uri=url)], + ) + item = Login.model_construct( + name=f"{uri.username}@{uri.hostname}", + login=data, + data=data, + key=key, + ) + return item + + +@pytest.fixture +def securenote() -> "SecureNote": + uri = urllib.parse.urlparse( + url := "http://username:password@securenote.example.org" + ) + key = secrets.token_bytes(64) + + data = SecureNoteData.model_construct( + Notes="".join(random.choices(string.ascii_letters, k=10)), + uris=[UriMatch.model_construct(match=UriMatchDetection.HOST, uri=url)], + ) + item = SecureNote.model_construct( + Name=f"{uri.username}@{uri.hostname}", + SecureNote=data, + Data=data, + Key=key, + ) + return item + + +@pytest.fixture +def card(): + key = secrets.token_bytes(64) + data = CardData.model_construct( + cardholderName="user", + brand="VISA", + code="123", + expMonth="11", + expYear="2020", + number="1204391293", + ) + item = Card.model_construct( + Name="user@VISA", + Card=data, + Data=data, + Key=key, + ) + return item + + +@pytest.fixture +def identity(): + key = secrets.token_bytes(64) + data = IdentityData.model_construct( + title="Mrs.", + firstName="A", + middleName="B", + lastName="C", + username="abc", + company="Z", + ssn="1", + passportNumber="2", + licenseNumber="3", + email="abc@Z.org", + phone="112", + address1="a1", + address2="a2", + address3="a3", + city="City", + state="State", + postalCode="1", + country="X", + ) + item = Identity.model_construct( + Name="user@Z", + Identity=data, + Data=data, + Key=key, + ) + return item + + +@pytest.fixture +def sshkey(): + fp = "SHA256:0uYSZPry8sa7UC/sfjLZCgjggJ12KhHHeD+BP0hew50" + priv = """-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACBMlizY/9+h3pZlH9ADEGOaL/aRnBA0XveKurHXW66oAwAAAIgdq/EQHavx +EAAAAAtzc2gtZWQyNTUxOQAAACBMlizY/9+h3pZlH9ADEGOaL/aRnBA0XveKurHXW66oAw +AAAEAjVrd/TKd20aXb5qdh15Jjqw3GNEhQ+dLBx0nfV7X29UyWLNj/36HelmUf0AMQY5ov +9pGcEDRe94q6sddbrqgDAAAAAAECAwQF +-----END OPENSSH PRIVATE KEY----- + """ + pub = ( + "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIEyWLNj" + "/36HelmUf0AMQY5ov9pGcEDRe94q6sddbrqgD" + ) + + data = SSHKeyData.model_construct( + keyFingerprint=fp, privateKey=priv, publicKey=pub + ) + item = SSHKeyData.model_construct(sshKey=data) + return item + + +@pytest.fixture +def collection(organization: Organization) -> OrganizationCollection: + return organization.create_collection("Test Collection") + + +@pytest.fixture +def ciphers(login, securenote, card, identity) -> list[CipherDetails]: + return [login, securenote, card, identity] + + +def test_user( + test_account: BitwardenAPIClient, + admin: VaultwardenAdminClient, + user: dict, + organization: Organization, + collection: OrganizationCollection, + ciphers, +): + # create + test_account.create_user(**user) + + # invite + organization.invite(user["email"]) + + # confirm + users = organization.users(force_refresh=True, search=user["email"]) + assert len(users) == 1 + u = users[0] + organization.confirm(u) + + # add to collection + u.add_collections([collection.Id]) + + # cleanup + organization.delete() + admin.delete(u.Id) + + +def test_org_create_ciphers( + test_account: BitwardenAPIClient, + organization: Organization, + collection: OrganizationCollection, + ciphers: list[CipherDetails], +): + for c in ciphers: + test_account.create_item(c, organization, [collection]) + + organization.delete() + + +@pytest.fixture( + params=[token_bytes(64), None], + ids=["with key", "without key"], +) +def cipher_key(request): + return request.param + + +def test_user_create_cipher( + test_account: BitwardenAPIClient, login, cipher_key +): + login.Key = cipher_key + v = test_account.create_item(login, None, None) + v.delete() + + +def test_org_create_cipher( + test_account: BitwardenAPIClient, + login, + cipher_key, + organization, + collection, +): + login.Key = cipher_key + v = test_account.create_item(login, organization, [collection]) + v.delete() + + +def test_cleanup_users(admin: VaultwardenAdminClient): + for i in admin.users(): + if i.Email.endswith("@example.org"): + admin.delete(i.Id) + + +SEARCH_ITEMS = [ + # ("http://default.com", "http://default.com", None), + ( + "http://sub.basedomain.com", + "http://basedomain.com", + UriMatchDetection.BASEDOMAIN, + ), + ("http://host.com/a", "http://host.com", UriMatchDetection.HOST), + ( + "http://startswith.com/a/b", + "http://startswith.com/a", + UriMatchDetection.STARTSWITH, + ), + ("http://re.com", r"^http://re\.c.m", UriMatchDetection.RE), + ("http://exact.com", "http://exact.com", UriMatchDetection.EXACT), +] + + +@pytest.fixture( + params=SEARCH_ITEMS, + ids=[urllib.parse.urlparse(url).hostname for url, *_ in SEARCH_ITEMS], +) +def logins(request, test_account, organization, collection): + url, uri, match = request.param + name = urllib.parse.urlparse(url).hostname + data = LoginData.model_construct( + name=name, + password="test123", + username="test", + Uris=[UriMatch.model_construct(match=match, uri=uri)], + ) + item = Login.model_construct( + name=name, + login=data, + data=data, + key=secrets.token_bytes(64), + ) + test_account.create_item(item, organization, [collection]) + return url, uri, match + + +def test_search( + test_account: BitwardenAPIClient, + organization: Organization, + collection: OrganizationCollection, + logins, +): + test_account.sync(force_refresh=True) + url, uri, match = logins + + r = list( + test_account.search_items( + url, organisations=[organization], collections=[collection] + ) + ) + assert len(r) == 1, url + assert r[0].Name == urllib.parse.urlparse(url).hostname + + +def test_edit( + test_account: BitwardenAPIClient, + organization: Organization, + collection: OrganizationCollection, + logins, +): + test_account.sync(force_refresh=True) + url, uri, match = logins + + r = list( + test_account.search_items( + url, organisations=[organization], collections=[collection] + ) + ) + assert len(r) == 1, url + assert r[0].Name == urllib.parse.urlparse(url).hostname + lo: Login = r[0] + assert lo.Login.username == "test" + lo.Login.username = lo.Login.password = "edit" + lo.save() + test_account.sync(force_refresh=True) + r = list( + test_account.search_items( + url, organisations=[organization], collections=[collection] + ) + ) + assert len(r) == 1, url + lo: Login = r[0] + assert lo.Login.username == lo.Login.password == "edit" + + +def test_attach( + test_account: BitwardenAPIClient, + organization: Organization, + collection: OrganizationCollection, + login: Login, +): + test_account.sync(force_refresh=True) + v = test_account.create_item(login, organization, [collection]) + v.attach(Path(__file__)) + + +def test_org_clean(admin, test_account): + for i in test_account._sync.Profile.Organizations: + if i.Name.startswith("Test "): + continue + admin.delete_organization(i.Id) diff --git a/tests/models/validation/__init__.py b/tests/models/validation/__init__.py index e69de29..251f548 100644 --- a/tests/models/validation/__init__.py +++ b/tests/models/validation/__init__.py @@ -0,0 +1,41 @@ +from typing import Any + +from vaultwarden.models.crypto import CryptoContext + + +def default_ctx(account: str = "test-account") -> Any: + import json + from pathlib import Path + + from vaultwarden.clients.bitwarden import BitwardenAPIClient + from vaultwarden.models.sync import ConnectToken + + client = BitwardenAPIClient( + url=".", + email=f"{account}@example.com", + password=account, + client_id=".", + device_id=".", + client_secret=".", + ) + ctx = CryptoContext(client) + + payload = json.loads( + Path(f"tests/fixtures/{account}/sync_camel.json").read_text() + ) + + ct = { + "Kdf": 0, + "KdfIterations": 600000, + "Key": payload["profile"]["key"], + "PrivateKey": payload["profile"]["privateKey"], + "access_token": "", + "expires_in": 3600, + "token_type": "", + "scope": "", + } + + client._connect_token = ConnectToken.model_validate(ct, context=ctx) + + client._sync_step(payload) + return ctx diff --git a/tests/models/validation/test_bitwarden_models.py b/tests/models/validation/test_bitwarden_models.py index 657296e..6239213 100644 --- a/tests/models/validation/test_bitwarden_models.py +++ b/tests/models/validation/test_bitwarden_models.py @@ -1,4 +1,5 @@ import unittest +from uuid import UUID from pydantic import TypeAdapter from vaultwarden.models.bitwarden import ( @@ -7,6 +8,9 @@ OrganizationUserDetails, ResplistBitwarden, ) +from vaultwarden.models.crypto import CryptoContext + +from . import default_ctx class TestBitwardenModels(unittest.TestCase): @@ -19,20 +23,24 @@ def test_organization(self): payload = self.read_json_payload( "tests/fixtures/test-organization/organization_camel.json" ) - data = Organization.model_validate_json(payload) + data = Organization.model_validate_json( + payload, context=CryptoContext(client=None) + ) assert data.Name == "Test Organization" def test_organization_users(self): payload = self.read_json_payload( "tests/fixtures/test-organization/users_camel.json" ) + ctx = default_ctx() + ctx.parent_id = UUID("cda840d2-1de0-4f31-bd49-b30dacd7e8b0") users = ( ResplistBitwarden[OrganizationUserDetails] .model_validate_json( payload, - context={"parent_id": "cda840d2-1de0-4f31-bd49-b30dacd7e8b0"}, + context=ctx, ) - .model_validate_json(payload) + .model_validate_json(payload, context=ctx) ) assert len(users.Data) == 2 assert users.Data[0].Email == "test-account@example.com" @@ -47,11 +55,17 @@ def test_organization_collections(self): ) collection1 = TypeAdapter(list[CollectionUser]).validate_json( payload1, - context={"parent_id": "9ed17918-31f6-4ac5-ac82-c11541cd8a7c"}, + context=CryptoContext( + client=None, + parent_id=UUID("9ed17918-31f6-4ac5-ac82-c11541cd8a7c"), + ), ) collection2 = TypeAdapter(list[CollectionUser]).validate_json( payload2, - context={"parent_id": "3c73f14f-5a01-4016-98bb-9605146a1a49"}, + context=CryptoContext( + client=None, + parent_id=UUID("3c73f14f-5a01-4016-98bb-9605146a1a49"), + ), ) assert len(collection1) == 0 diff --git a/tests/models/validation/test_pascal_camel_cases.py b/tests/models/validation/test_pascal_camel_cases.py index e82ebd1..c7a8620 100644 --- a/tests/models/validation/test_pascal_camel_cases.py +++ b/tests/models/validation/test_pascal_camel_cases.py @@ -8,6 +8,8 @@ ) from vaultwarden.models.sync import SyncData, VaultwardenUser +from . import default_ctx + class TestModelCases(unittest.TestCase): @staticmethod @@ -22,17 +24,25 @@ def test_organization(self): camel_case_payload = self.read_json_payload( "tests/fixtures/test-organization/organization_camel.json" ) - pascal = Organization.model_validate_json(pascal_case_payload) - camel = Organization.model_validate_json(camel_case_payload) + ctx = default_ctx() + pascal = Organization.model_validate_json( + pascal_case_payload, context=ctx + ) + camel = Organization.model_validate_json( + camel_case_payload, context=ctx + ) self.assertEqual(pascal.Name, camel.Name) def test_collections(self): + ctx = default_ctx() + ctx.push(ctx.client._sync.Profile.Organizations[0].Key) + pascal_case_payload = self.read_json_payload( "tests/fixtures/test-organization/collections/collections_pascal.json" ) pascal_collections = ( ResplistBitwarden[OrganizationCollection] - .model_validate_json(pascal_case_payload) + .model_validate_json(pascal_case_payload, context=ctx) .Data ) camel_case_payload = self.read_json_payload( @@ -40,7 +50,7 @@ def test_collections(self): ) camel_collections = ( ResplistBitwarden[OrganizationCollection] - .model_validate_json(camel_case_payload) + .model_validate_json(camel_case_payload, context=ctx) .Data ) self.assertEqual(len(pascal_collections), len(camel_collections)) @@ -48,14 +58,15 @@ def test_collections(self): self.assertEqual(pascal_collections[1].Name, camel_collections[1].Name) def test_sync_data(self): + ctx = default_ctx() pascal_case_payload = self.read_json_payload( "tests/fixtures/test-account/sync_pascal.json" ) camel_case_payload = self.read_json_payload( "tests/fixtures/test-account/sync_camel.json" ) - pascal = SyncData.model_validate_json(pascal_case_payload) - camel = SyncData.model_validate_json(camel_case_payload) + pascal = SyncData.model_validate_json(pascal_case_payload, context=ctx) + camel = SyncData.model_validate_json(camel_case_payload, context=ctx) self.assertEqual(len(pascal.Ciphers), len(camel.Ciphers)) self.assertEqual(len(pascal.Collections), len(camel.Collections)) self.assertEqual( diff --git a/tests/models/validation/test_sync_models.py b/tests/models/validation/test_sync_models.py index f438f54..063c849 100644 --- a/tests/models/validation/test_sync_models.py +++ b/tests/models/validation/test_sync_models.py @@ -2,6 +2,8 @@ from vaultwarden.models.sync import SyncData +from . import default_ctx + class TestSyncModels(unittest.TestCase): @staticmethod @@ -13,7 +15,9 @@ def test_syncdata(self): payload = self.read_json_payload( "tests/fixtures/test-account/sync_camel.json" ) - data = SyncData.model_validate_json(payload) + ctx = default_ctx() + + data = SyncData.model_validate_json(payload, context=ctx) assert len(data.Ciphers) == 2 assert len(data.Collections) == 3 assert len(data.Profile.Organizations) == 1