diff --git a/ulid/__init__.py b/ulid/__init__.py index 96178e3..f72e781 100644 --- a/ulid/__init__.py +++ b/ulid/__init__.py @@ -80,12 +80,25 @@ class ULID: ValueError: If the provided value is not a valid encoded ULID. """ - def __init__(self, value: bytes | None = None) -> None: - if value is not None and len(value) != constants.BYTES_LEN: - raise ValueError("ULID has to be exactly 16 bytes long.") - self.bytes: bytes = ( - value or ULID.from_timestamp(time.time_ns() // constants.NANOSECS_IN_MILLISECS).bytes - ) + def __init__(self, value: bytes | str | None = None) -> None: + if value is None: + value = self._gen_bytes_from_ts() + elif isinstance(value, bytes): + if len(value) != constants.BYTES_LEN: + raise ValueError(f"ULID has to be exactly {constants.BYTES_LEN} bytes long.") + elif isinstance(value, str): + if len(value) != constants.REPR_LEN: + raise ValueError(f"ULID has to be exactly {constants.REPR_LEN} characters long.") + value = base32.decode(value) + + self.bytes: bytes = value + + @staticmethod + def _gen_bytes_from_ts(ts_ms: int | None = None) -> bytes: + """Generate a new ULID bytes from the timestamp(ms).""" + + ts_ms = ts_ms or time.time_ns() // constants.NANOSECS_IN_MILLISECS + return ts_ms.to_bytes(constants.TIMESTAMP_LEN, "big") + os.urandom(constants.RANDOMNESS_LEN) @classmethod @validate_type(datetime) @@ -116,9 +129,7 @@ def from_timestamp(cls: type[U], value: float) -> U: """ if isinstance(value, float): value = int(value * constants.MILLISECS_IN_SECS) - timestamp = int.to_bytes(value, constants.TIMESTAMP_LEN, "big") - randomness = os.urandom(constants.RANDOMNESS_LEN) - return cls.from_bytes(timestamp + randomness) + return cls.from_bytes(cls._gen_bytes_from_ts(value)) @classmethod @validate_type(uuid.UUID) @@ -190,7 +201,7 @@ def parse(cls: type[U], value: Any) -> U: return cls.from_bytes(value) raise TypeError(f"Cannot parse ULID from type {type(value)}") - @property + @functools.cached_property def milliseconds(self) -> int: """The timestamp part as epoch time in milliseconds. @@ -201,7 +212,7 @@ def milliseconds(self) -> int: """ return int.from_bytes(self.bytes[: constants.TIMESTAMP_LEN], byteorder="big") - @property + @functools.cached_property def timestamp(self) -> float: """The timestamp part as epoch time in seconds. @@ -212,7 +223,7 @@ def timestamp(self) -> float: """ return self.milliseconds / constants.MILLISECS_IN_SECS - @property + @functools.cached_property def datetime(self) -> datetime: """Return the timestamp part as timezone-aware :class:`datetime` in UTC. @@ -223,7 +234,7 @@ def datetime(self) -> datetime: """ return datetime.fromtimestamp(self.timestamp, timezone.utc) - @property + @functools.cached_property def hex(self) -> str: """Encode the :class:`ULID`-object as a 32 char sequence of hex values.""" return self.bytes.hex() @@ -249,7 +260,7 @@ def to_uuid4(self) -> uuid.UUID: return uuid.UUID(bytes=self.bytes, version=4) def __repr__(self) -> str: - return f"ULID({self!s})" + return f"ULID({str(self)!r})" def __str__(self) -> str: """Encode this object as a 26 character string sequence.""" @@ -297,7 +308,9 @@ def __get_pydantic_core_schema__(cls, source: Any, handler: GetCoreSchemaHandler core_schema.union_schema([ core_schema.is_instance_schema(ULID), core_schema.no_info_plain_validator_function(ULID), - core_schema.str_schema(pattern=r"[A-Z0-9]{26}", min_length=26, max_length=26), + core_schema.str_schema( + pattern=rf"[{base32.ENCODE}]{{26}}", min_length=26, max_length=26 + ), core_schema.bytes_schema(min_length=16, max_length=16), ]), serialization=core_schema.to_string_ser_schema( diff --git a/ulid/base32.py b/ulid/base32.py index fb94031..f6faa90 100644 --- a/ulid/base32.py +++ b/ulid/base32.py @@ -7,7 +7,7 @@ # https://github.com/RobThree/NUlid/blob/89f5a9fc827d191ae5adafe42547575ed3a47723/NUlid/Ulid.cs#L168 ENCODE: str = "0123456789ABCDEFGHJKMNPQRSTVWXYZ" -DECODE: Sequence[int] = [ +DECODE: Sequence[int] = ( 0xFF, 0xFF, 0xFF, @@ -138,7 +138,7 @@ 0xFF, 0xFF, 0xFF, -] +) def encode(binary: bytes) -> str: @@ -171,7 +171,7 @@ def encode_randomness(binary: bytes) -> str: if len(binary) != constants.RANDOMNESS_LEN: raise ValueError("Randomness value has to be exactly 10 bytes long.") lut = ENCODE - return "".join([ + return "".join(( lut[(binary[0] & 248) >> 3], lut[((binary[0] & 7) << 2) | ((binary[1] & 192) >> 6)], lut[(binary[1] & 62) >> 1], @@ -188,7 +188,7 @@ def encode_randomness(binary: bytes) -> str: lut[(binary[8] & 124) >> 2], lut[((binary[8] & 3) << 3) | ((binary[9] & 224) >> 5)], lut[(binary[9] & 31)], - ]) + )) def decode(encoded: str) -> bytes: @@ -209,14 +209,14 @@ def decode_timestamp(encoded: str) -> bytes: # https://github.com/ulid/spec?tab=readme-ov-file#overflow-errors-when-parsing-base32-strings if lut[values[0]] > 7: # noqa: PLR2004 raise ValueError(f"Timestamp value {encoded} is too large and will overflow 128-bits.") - return bytes([ + return bytes(( ((lut[values[0]] << 5) | lut[values[1]]) & 0xFF, ((lut[values[2]] << 3) | (lut[values[3]] >> 2)) & 0xFF, ((lut[values[3]] << 6) | (lut[values[4]] << 1) | (lut[values[5]] >> 4)) & 0xFF, ((lut[values[5]] << 4) | (lut[values[6]] >> 1)) & 0xFF, ((lut[values[6]] << 7) | (lut[values[7]] << 2) | (lut[values[8]] >> 3)) & 0xFF, ((lut[values[8]] << 5) | (lut[values[9]])) & 0xFF, - ]) + )) def decode_randomness(encoded: str) -> bytes: @@ -224,7 +224,7 @@ def decode_randomness(encoded: str) -> bytes: raise ValueError("ULID randomness has to be exactly 16 characters long.") lut = DECODE values = bytes(encoded, "ascii") - return bytes([ + return bytes(( ((lut[values[0]] << 3) | (lut[values[1]] >> 2)) & 0xFF, ((lut[values[1]] << 6) | (lut[values[2]] << 1) | (lut[values[3]] >> 4)) & 0xFF, ((lut[values[3]] << 4) | (lut[values[4]] >> 1)) & 0xFF, @@ -235,4 +235,4 @@ def decode_randomness(encoded: str) -> bytes: ((lut[values[11]] << 4) | (lut[values[12]] >> 1)) & 0xFF, ((lut[values[12]] << 7) | (lut[values[13]] << 2) | (lut[values[14]] >> 3)) & 0xFF, ((lut[values[14]] << 5) | (lut[values[15]])) & 0xFF, - ]) + ))