diff --git a/tests/test_ulid.py b/tests/test_ulid.py index 11ae933..f9e0e8e 100644 --- a/tests/test_ulid.py +++ b/tests/test_ulid.py @@ -74,6 +74,14 @@ def test_same_millisecond_overflow() -> None: ULID() +def test_same_millisecond_monotonic_sorting_without_frozen_time() -> None: + previous_ulid = "" + for i in range(10000): + new_ulid = ULID() + assert new_ulid > previous_ulid + previous_ulid = new_ulid + + def assert_sorted(seq: list) -> None: last = seq[0] for item in seq[1:]: diff --git a/ulid/__init__.py b/ulid/__init__.py index d327385..c22b342 100644 --- a/ulid/__init__.py +++ b/ulid/__init__.py @@ -69,10 +69,9 @@ def timestamp(self, value: float | None = None) -> int: raise ValueError("Value exceeds maximum possible timestamp") return value - def randomness(self) -> bytes: + def randomness(self, timestamp: int) -> bytes: with self.lock: - current_timestamp = self.timestamp() - if current_timestamp == self.prev_timestamp: + if timestamp == self.prev_timestamp: if self.prev_randomness == constants.MAX_RANDOMNESS: raise ValueError("Randomness within same millisecond exhausted") randomness = self.increment_bytes(self.prev_randomness) @@ -80,7 +79,7 @@ def randomness(self) -> bytes: randomness = os.urandom(constants.RANDOMNESS_LEN) self.prev_randomness = randomness - self.prev_timestamp = current_timestamp + self.prev_timestamp = timestamp return randomness def increment_bytes(self, value: bytes) -> bytes: @@ -148,8 +147,9 @@ def from_timestamp(cls, value: float) -> Self: >>> ULID.from_timestamp(time.time()) ULID(01E75QWN5HKQ0JAVX9FG1K4YP4) """ - timestamp = int.to_bytes(cls.provider.timestamp(value), constants.TIMESTAMP_LEN, "big") - randomness = cls.provider.randomness() + timestamp_value = cls.provider.timestamp(value) + timestamp = int.to_bytes(timestamp_value, constants.TIMESTAMP_LEN, "big") + randomness = cls.provider.randomness(timestamp_value) return cls.from_bytes(timestamp + randomness) @classmethod