Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions ulid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,25 @@ class ULID:
ValueError: If the provided value is not a valid encoded ULID.
"""

def __init__(self, value: bytes | None = None) -> None:
if value is not None and len(value) != constants.BYTES_LEN:
raise ValueError("ULID has to be exactly 16 bytes long.")
self.bytes: bytes = (
value or ULID.from_timestamp(time.time_ns() // constants.NANOSECS_IN_MILLISECS).bytes
)
def __init__(self, value: bytes | str | None = None) -> None:
if value is None:
value = self._gen_bytes_from_ts()
elif isinstance(value, bytes):
if len(value) != constants.BYTES_LEN:
raise ValueError(f"ULID has to be exactly {constants.BYTES_LEN} bytes long.")
elif isinstance(value, str):
if len(value) != constants.REPR_LEN:
raise ValueError(f"ULID has to be exactly {constants.REPR_LEN} characters long.")
value = base32.decode(value)

self.bytes: bytes = value

@staticmethod
def _gen_bytes_from_ts(ts_ms: int | None = None) -> bytes:
"""Generate a new ULID bytes from the timestamp(ms)."""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. All the other methods use the more lengthy word "timestamp" so that I would like to keep it consistent: What do you think about _bytes_from_timestamp(...)
  2. The docstring could be "Generate a ULID byte sequence of from a timestamp"


ts_ms = ts_ms or time.time_ns() // constants.NANOSECS_IN_MILLISECS
return ts_ms.to_bytes(constants.TIMESTAMP_LEN, "big") + os.urandom(constants.RANDOMNESS_LEN)

@classmethod
@validate_type(datetime)
Expand Down Expand Up @@ -116,9 +129,7 @@ def from_timestamp(cls: type[U], value: float) -> U:
"""
if isinstance(value, float):
value = int(value * constants.MILLISECS_IN_SECS)
timestamp = int.to_bytes(value, constants.TIMESTAMP_LEN, "big")
randomness = os.urandom(constants.RANDOMNESS_LEN)
return cls.from_bytes(timestamp + randomness)
return cls.from_bytes(cls._gen_bytes_from_ts(value))

@classmethod
@validate_type(uuid.UUID)
Expand Down Expand Up @@ -190,7 +201,7 @@ def parse(cls: type[U], value: Any) -> U:
return cls.from_bytes(value)
raise TypeError(f"Cannot parse ULID from type {type(value)}")

@property
@functools.cached_property
def milliseconds(self) -> int:
"""The timestamp part as epoch time in milliseconds.

Expand All @@ -201,7 +212,7 @@ def milliseconds(self) -> int:
"""
return int.from_bytes(self.bytes[: constants.TIMESTAMP_LEN], byteorder="big")

@property
@functools.cached_property
def timestamp(self) -> float:
"""The timestamp part as epoch time in seconds.

Expand All @@ -212,7 +223,7 @@ def timestamp(self) -> float:
"""
return self.milliseconds / constants.MILLISECS_IN_SECS

@property
@functools.cached_property
def datetime(self) -> datetime:
"""Return the timestamp part as timezone-aware :class:`datetime` in UTC.

Expand All @@ -223,7 +234,7 @@ def datetime(self) -> datetime:
"""
return datetime.fromtimestamp(self.timestamp, timezone.utc)

@property
@functools.cached_property
def hex(self) -> str:
"""Encode the :class:`ULID`-object as a 32 char sequence of hex values."""
return self.bytes.hex()
Expand All @@ -249,7 +260,7 @@ def to_uuid4(self) -> uuid.UUID:
return uuid.UUID(bytes=self.bytes, version=4)

def __repr__(self) -> str:
return f"ULID({self!s})"
return f"ULID({str(self)!r})"

def __str__(self) -> str:
"""Encode this object as a 26 character string sequence."""
Expand Down Expand Up @@ -297,7 +308,9 @@ def __get_pydantic_core_schema__(cls, source: Any, handler: GetCoreSchemaHandler
core_schema.union_schema([
core_schema.is_instance_schema(ULID),
core_schema.no_info_plain_validator_function(ULID),
core_schema.str_schema(pattern=r"[A-Z0-9]{26}", min_length=26, max_length=26),
core_schema.str_schema(
pattern=rf"[{base32.ENCODE}]{{26}}", min_length=26, max_length=26
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a test that would need to be adapted. Good catch with the I, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I missed this issue and I'll follow up on a fix

),
core_schema.bytes_schema(min_length=16, max_length=16),
]),
serialization=core_schema.to_string_ser_schema(
Expand Down
16 changes: 8 additions & 8 deletions ulid/base32.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# https://github.com/RobThree/NUlid/blob/89f5a9fc827d191ae5adafe42547575ed3a47723/NUlid/Ulid.cs#L168

ENCODE: str = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"
DECODE: Sequence[int] = [
DECODE: Sequence[int] = (
0xFF,
0xFF,
0xFF,
Expand Down Expand Up @@ -138,7 +138,7 @@
0xFF,
0xFF,
0xFF,
]
)


def encode(binary: bytes) -> str:
Expand Down Expand Up @@ -171,7 +171,7 @@ def encode_randomness(binary: bytes) -> str:
if len(binary) != constants.RANDOMNESS_LEN:
raise ValueError("Randomness value has to be exactly 10 bytes long.")
lut = ENCODE
return "".join([
return "".join((
lut[(binary[0] & 248) >> 3],
lut[((binary[0] & 7) << 2) | ((binary[1] & 192) >> 6)],
lut[(binary[1] & 62) >> 1],
Expand All @@ -188,7 +188,7 @@ def encode_randomness(binary: bytes) -> str:
lut[(binary[8] & 124) >> 2],
lut[((binary[8] & 3) << 3) | ((binary[9] & 224) >> 5)],
lut[(binary[9] & 31)],
])
))


def decode(encoded: str) -> bytes:
Expand All @@ -209,22 +209,22 @@ def decode_timestamp(encoded: str) -> bytes:
# https://github.com/ulid/spec?tab=readme-ov-file#overflow-errors-when-parsing-base32-strings
if lut[values[0]] > 7: # noqa: PLR2004
raise ValueError(f"Timestamp value {encoded} is too large and will overflow 128-bits.")
return bytes([
return bytes((
((lut[values[0]] << 5) | lut[values[1]]) & 0xFF,
((lut[values[2]] << 3) | (lut[values[3]] >> 2)) & 0xFF,
((lut[values[3]] << 6) | (lut[values[4]] << 1) | (lut[values[5]] >> 4)) & 0xFF,
((lut[values[5]] << 4) | (lut[values[6]] >> 1)) & 0xFF,
((lut[values[6]] << 7) | (lut[values[7]] << 2) | (lut[values[8]] >> 3)) & 0xFF,
((lut[values[8]] << 5) | (lut[values[9]])) & 0xFF,
])
))


def decode_randomness(encoded: str) -> bytes:
if len(encoded) != constants.RANDOMNESS_REPR_LEN:
raise ValueError("ULID randomness has to be exactly 16 characters long.")
lut = DECODE
values = bytes(encoded, "ascii")
return bytes([
return bytes((
((lut[values[0]] << 3) | (lut[values[1]] >> 2)) & 0xFF,
((lut[values[1]] << 6) | (lut[values[2]] << 1) | (lut[values[3]] >> 4)) & 0xFF,
((lut[values[3]] << 4) | (lut[values[4]] >> 1)) & 0xFF,
Expand All @@ -235,4 +235,4 @@ def decode_randomness(encoded: str) -> bytes:
((lut[values[11]] << 4) | (lut[values[12]] >> 1)) & 0xFF,
((lut[values[12]] << 7) | (lut[values[13]] << 2) | (lut[values[14]] >> 3)) & 0xFF,
((lut[values[14]] << 5) | (lut[values[15]])) & 0xFF,
])
))
Loading