diff --git a/packages/Dbal/tests/Integration/DbalBackedMessageChannelTest.php b/packages/Dbal/tests/Integration/DbalBackedMessageChannelTest.php index 034d960f1..3a324062a 100644 --- a/packages/Dbal/tests/Integration/DbalBackedMessageChannelTest.php +++ b/packages/Dbal/tests/Integration/DbalBackedMessageChannelTest.php @@ -253,11 +253,11 @@ public function test_delaying_the_message_with_native_clock() ->build() ); - $ecotoneLite->waitTill(TimeSpan::withSeconds(1)); + $ecotoneLite->advanceTimeTo(Duration::seconds(1)); $this->assertNull($messageChannel->receive()); - $ecotoneLite->waitTill(TimeSpan::withSeconds(3)); + $ecotoneLite->advanceTimeTo(Duration::seconds(3)); $this->assertNotNull($messageChannel->receive()); } @@ -289,11 +289,11 @@ public function test_delaying_the_message_with_native_clock_using_date_time() /** @var EcotoneClockInterface $clock */ $clock = $ecotoneLite->getServiceFromContainer(EcotoneClockInterface::class); - $ecotoneLite->waitTill($clock->now()->add(Duration::seconds(1))); + $ecotoneLite->changeTimeTo($clock->now()->add(Duration::seconds(1))); $this->assertNull($messageChannel->receive()); - $ecotoneLite->waitTill($clock->now()->add(Duration::seconds(3))); + $ecotoneLite->changeTimeTo($clock->now()->add(Duration::seconds(3))); $this->assertNotNull($messageChannel->receive()); } diff --git a/packages/Ecotone/src/Lite/Test/ConfiguredMessagingSystemWithTestSupport.php b/packages/Ecotone/src/Lite/Test/ConfiguredMessagingSystemWithTestSupport.php index f392edcd7..fc2779657 100644 --- a/packages/Ecotone/src/Lite/Test/ConfiguredMessagingSystemWithTestSupport.php +++ b/packages/Ecotone/src/Lite/Test/ConfiguredMessagingSystemWithTestSupport.php @@ -4,6 +4,7 @@ namespace Ecotone\Lite\Test; +use DateTimeImmutable; use Ecotone\Messaging\Config\ConfiguredMessagingSystem; use Ecotone\Messaging\Config\Container\GatewayProxyMethodReference; use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; @@ -12,12 +13,16 @@ use Ecotone\Messaging\MessageChannel; use Ecotone\Messaging\MessageHeaders; use Ecotone\Messaging\MessagePublisher; +use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry; use Ecotone\Modelling\CommandBus; use Ecotone\Modelling\DistributedBus; use Ecotone\Modelling\EventBus; use Ecotone\Modelling\QueryBus; +use Ecotone\Test\StaticPsrClock; +use InvalidArgumentException; +use Psr\Clock\ClockInterface; /** * licence Apache-2.0 @@ -129,4 +134,51 @@ public function replaceWith(ConfiguredMessagingSystem $messagingSystem): void { $this->configuredMessagingSystem->replaceWith($messagingSystem); } + + public function changeTime(DateTimeImmutable|Duration $time): self + { + $psrClock = $this->getStaticPsrClockFromContainer(); + + if ($time instanceof Duration) { + $psrClock->setCurrentTime( + DateTimeImmutable::createFromInterface($psrClock->now())->modify("+{$time->inMicroseconds()} microseconds") + ); + return $this; + } + + if ($psrClock->hasBeenChanged() && $time <= $psrClock->now()) { + throw new InvalidArgumentException( + sprintf( + 'Cannot move time backwards. Current clock time: %s, requested time: %s', + $psrClock->now()->format('Y-m-d H:i:s.u'), + $time->format('Y-m-d H:i:s.u') + ) + ); + } + + $psrClock->setCurrentTime($time); + + return $this; + } + + private function getStaticPsrClockFromContainer(): StaticPsrClock + { + try { + $psrClock = $this->configuredMessagingSystem->getServiceFromContainer(ClockInterface::class); + } catch (\Throwable) { + throw new InvalidArgumentException( + 'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' . + 'Register ClockInterface::class => new StaticPsrClock() in your container services.' + ); + } + + if (! $psrClock instanceof StaticPsrClock) { + throw new InvalidArgumentException( + 'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' . + 'Register ClockInterface::class => new StaticPsrClock() in your container services.' + ); + } + + return $psrClock; + } } diff --git a/packages/Ecotone/src/Lite/Test/FlowTestSupport.php b/packages/Ecotone/src/Lite/Test/FlowTestSupport.php index 28797815e..7112d4479 100644 --- a/packages/Ecotone/src/Lite/Test/FlowTestSupport.php +++ b/packages/Ecotone/src/Lite/Test/FlowTestSupport.php @@ -4,6 +4,7 @@ namespace Ecotone\Lite\Test; +use DateTimeImmutable; use DateTimeInterface; use Ecotone\EventSourcing\EventStore; use Ecotone\EventSourcing\ProjectionManager; @@ -18,6 +19,7 @@ use Ecotone\Messaging\MessagingException; use Ecotone\Messaging\PollableChannel; use Ecotone\Messaging\Scheduling\Clock; +use Ecotone\Messaging\Scheduling\Duration; use Ecotone\Messaging\Scheduling\EcotoneClockInterface; use Ecotone\Messaging\Scheduling\TimeSpan; use Ecotone\Messaging\Support\Assert; @@ -32,6 +34,8 @@ use Ecotone\Modelling\EventBus; use Ecotone\Modelling\QueryBus; use Ecotone\Projecting\ProjectionRegistry; +use Ecotone\Test\StaticPsrClock; +use InvalidArgumentException; /** * @template T @@ -191,23 +195,65 @@ public function getEventStreamEvents(string $streamName): array return $this->getGateway(EventStore::class)->load($streamName); } - public function waitTill(TimeSpan|DateTimeInterface $time): self + public function changeTimeTo(DateTimeImmutable $time): self { - if ($time instanceof DateTimeInterface) { - if ($time < $this->clock->now()) { - throw new MessagingException("Time to wait is in the past. Now: {$this->clock->now()}, time to wait: {$time}"); - } + $psrClock = $this->getStaticPsrClockFromContainer(); + + if ($psrClock->hasBeenChanged() && $time <= $psrClock->now()) { + throw new InvalidArgumentException( + \sprintf( + 'Cannot move time backwards. Current clock time: %s, requested time: %s', + $psrClock->now()->format('Y-m-d H:i:s.u'), + $time->format('Y-m-d H:i:s.u') + ) + ); } - $this->clock->sleep( - $time instanceof TimeSpan - ? $time->toDuration() - : TimeSpan::fromDateInterval($time->diff($this->clock->now()))->toDuration() + $psrClock->setCurrentTime($time); + + return $this; + } + + public function advanceTimeTo(Duration $duration): self + { + $psrClock = $this->getStaticPsrClockFromContainer(); + $psrClock->setCurrentTime( + DateTimeImmutable::createFromInterface($psrClock->now())->modify("+{$duration->inMicroseconds()} microseconds") ); return $this; } + private function getStaticPsrClockFromContainer(): StaticPsrClock + { + try { + /** @var Clock $clock */ + $clock = $this->configuredMessagingSystem->getServiceFromContainer(EcotoneClockInterface::class); + } catch (\Throwable) { + throw new InvalidArgumentException( + 'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' . + 'Register ClockInterface::class => new StaticPsrClock() in your container services.' + ); + } + + if (! $clock instanceof Clock) { + throw new InvalidArgumentException( + 'Changing time is only possible when using Clock as the EcotoneClockInterface. ' . + 'Register EcotoneClockInterface::class => new Clock(new StaticPsrClock()) in your container services.' + ); + } + + $clock = $clock->internalClock(); + if (! $clock instanceof StaticPsrClock) { + throw new InvalidArgumentException( + 'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' . + 'Register ClockInterface::class => new StaticPsrClock() in your container services.' + ); + } + + return $clock; + } + /** * @param Event[]|object[]|array[] $events */ diff --git a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/EndpointHeaders/EndpointHeadersInterceptor.php b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/EndpointHeaders/EndpointHeadersInterceptor.php index eaf3fdbeb..4ade191ce 100644 --- a/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/EndpointHeaders/EndpointHeadersInterceptor.php +++ b/packages/Ecotone/src/Messaging/Config/Annotation/ModuleConfiguration/EndpointHeaders/EndpointHeadersInterceptor.php @@ -4,9 +4,14 @@ namespace Ecotone\Messaging\Config\Annotation\ModuleConfiguration\EndpointHeaders; +use DateTimeImmutable; use DateTimeInterface; use Ecotone\Messaging\Attribute\Endpoint\AddHeader; use Ecotone\Messaging\Attribute\Endpoint\Delayed; + +use function is_string; +use function preg_match; +use function str_ends_with; use Ecotone\Messaging\Attribute\Endpoint\Priority; use Ecotone\Messaging\Attribute\Endpoint\RemoveHeader; use Ecotone\Messaging\Attribute\Endpoint\TimeToLive; @@ -62,6 +67,10 @@ public function addMetadata(Message $message, ?AddHeader $addHeader, ?Delayed $d ]); } + if (is_string($metadata[MessageHeaders::DELIVERY_DELAY])) { + $metadata[MessageHeaders::DELIVERY_DELAY] = $this->parseDateTimeStringWithRequiredOffset($metadata[MessageHeaders::DELIVERY_DELAY]); + } + $type = Type::createFromVariable($metadata[MessageHeaders::DELIVERY_DELAY]); if (! $type->isCompatibleWith(UnionType::createWith([ Type::int(), @@ -120,4 +129,20 @@ public function getDefinition(): Definition Reference::to(ExpressionEvaluationService::REFERENCE), ]); } + + private function parseDateTimeStringWithRequiredOffset(string $dateTimeString): DateTimeImmutable + { + if (! $this->hasUtcOffset($dateTimeString)) { + throw ConfigurationException::create("Delivery delay string '{$dateTimeString}' must contain a UTC offset (e.g., '+02:00' or 'Z'). Dates without timezone information are ambiguous."); + } + + return new DateTimeImmutable($dateTimeString); + } + + private function hasUtcOffset(string $dateTimeString): bool + { + return preg_match('/[+-]\d{2}:\d{2}$/', $dateTimeString) === 1 + || preg_match('/[+-]\d{4}$/', $dateTimeString) === 1 + || str_ends_with($dateTimeString, 'Z'); + } } diff --git a/packages/Ecotone/src/Messaging/Scheduling/Clock.php b/packages/Ecotone/src/Messaging/Scheduling/Clock.php index f74ca7930..1aaf0be91 100644 --- a/packages/Ecotone/src/Messaging/Scheduling/Clock.php +++ b/packages/Ecotone/src/Messaging/Scheduling/Clock.php @@ -36,6 +36,11 @@ public static function get(): EcotoneClockInterface return self::$globalClock ?? new self(self::defaultClock()); } + public function internalClock(): PsrClockInterface + { + return $this->clock; + } + public function now(): DatePoint { $now = $this->clock->now(); diff --git a/packages/Ecotone/src/Test/StaticPsrClock.php b/packages/Ecotone/src/Test/StaticPsrClock.php index ae99c5748..5028e7466 100644 --- a/packages/Ecotone/src/Test/StaticPsrClock.php +++ b/packages/Ecotone/src/Test/StaticPsrClock.php @@ -14,22 +14,46 @@ */ final class StaticPsrClock implements ClockInterface, SleepInterface { - private Duration $sleepDuration; + private bool $hasBeenChanged = false; + private ?DateTimeImmutable $now = null; - public function __construct(private ?string $now = null) + public function __construct(?string $now = null) { - $this->sleepDuration = Duration::zero(); + $this->now = ($now === null || $now === 'now') ? null : new DateTimeImmutable($now); } public function now(): DateTimeImmutable { - $now = $this->now === null ? new DateTimeImmutable() : new DateTimeImmutable($this->now); + if ($this->now !== null) { + return $this->now; + } - return $now->modify("+{$this->sleepDuration->zeroIfNegative()->inMicroseconds()} microseconds"); + return new DateTimeImmutable('now'); } public function sleep(Duration $duration): void { - $this->sleepDuration = $this->sleepDuration->add($duration); + if ($duration->isNegativeOrZero()) { + return; + } + + if ($this->now === null) { + + usleep($duration->inMicroseconds()); + return; + } + + $this->now = $this->now()->modify("+{$duration->inMicroseconds()} microseconds"); + } + + public function hasBeenChanged(): bool + { + return $this->hasBeenChanged; + } + + public function setCurrentTime(DateTimeImmutable $time): void + { + $this->now = $time; + $this->hasBeenChanged = true; } } diff --git a/packages/Ecotone/tests/Messaging/Fixture/AddHeaders/AddingMultipleHeaders.php b/packages/Ecotone/tests/Messaging/Fixture/AddHeaders/AddingMultipleHeaders.php index 8cd73a061..aa552cecf 100644 --- a/packages/Ecotone/tests/Messaging/Fixture/AddHeaders/AddingMultipleHeaders.php +++ b/packages/Ecotone/tests/Messaging/Fixture/AddHeaders/AddingMultipleHeaders.php @@ -63,4 +63,12 @@ public function testKeepTtlHeader(): void { } + + #[Delayed(expression: 'payload.delay')] + #[Asynchronous('async')] + #[CommandHandler('addHeadersWithStringDelayExpression', endpointId: 'addHeadersWithStringDelayExpressionEndpoint')] + public function withStringDelayExpression(): void + { + + } } diff --git a/packages/Ecotone/tests/Messaging/Integration/Scheduling/DelayedMessageAgainstGlobalClockTest.php b/packages/Ecotone/tests/Messaging/Integration/Scheduling/DelayedMessageAgainstGlobalClockTest.php index 0f981f72b..0d26ff649 100644 --- a/packages/Ecotone/tests/Messaging/Integration/Scheduling/DelayedMessageAgainstGlobalClockTest.php +++ b/packages/Ecotone/tests/Messaging/Integration/Scheduling/DelayedMessageAgainstGlobalClockTest.php @@ -64,34 +64,31 @@ public function test_delayed_message_observes_clock_changes() ); } - public function test_delayed_message_observes_clock_changes_natively_by_moving_time() + public function test_delayed_message_is_released_when_moving_time_forward_using_change_time(): void { $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting( - [EcotoneClockInterface::class, OrderService::class, NotificationService::class, CustomNotifier::class], + [OrderService::class, NotificationService::class, CustomNotifier::class], [new OrderService(), new NotificationService(), $notifier = new CustomNotifier()], enableAsynchronousProcessing: [ - // 1. Turn on Delayable In Memory Pollable Channel SimpleMessageChannelBuilder::createQueueChannel('notifications', true), ] ); + $ecotoneTestSupport->changeTimeTo(new \DateTimeImmutable('2025-08-11 16:00:00')); $ecotoneTestSupport->sendCommandWithRoutingKey('order.register', new PlaceOrder('123')); - $clock = Clock::get(); - $clock->sleep(Duration::minutes(1)->add(Duration::seconds(1))); + $ecotoneTestSupport->run('notifications'); + $this->assertCount(0, $notifier->getNotificationsOf('placedOrder')); - // 2. Releasing messages awaiting for 60 seconds + $ecotoneTestSupport->changeTimeTo(new \DateTimeImmutable('2025-08-11 16:01:01')); $ecotoneTestSupport->run('notifications'); - $this->assertEquals( - 1, - count($notifier->getNotificationsOf('placedOrder')) - ); + $this->assertCount(1, $notifier->getNotificationsOf('placedOrder')); } - public function test_clock_moves_in_time_when_not_injected(): void + public function test_delayed_message_is_released_when_advancing_time_using_duration(): void { - EcotoneLite::bootstrapFlowTesting( + $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting( [OrderService::class, NotificationService::class, CustomNotifier::class], [new OrderService(), new NotificationService(), $notifier = new CustomNotifier()], enableAsynchronousProcessing: [ @@ -99,9 +96,126 @@ public function test_clock_moves_in_time_when_not_injected(): void ] ); - $time = Clock::get()->now(); - $nextMoment = Clock::get()->now(); + $ecotoneTestSupport->sendCommandWithRoutingKey('order.register', new PlaceOrder('123')); + + $ecotoneTestSupport->run('notifications'); + $this->assertCount(0, $notifier->getNotificationsOf('placedOrder')); + + $ecotoneTestSupport->advanceTimeTo(Duration::minutes(2)); + $ecotoneTestSupport->run('notifications'); + + $this->assertCount(1, $notifier->getNotificationsOf('placedOrder')); + } + + public function test_first_change_time_call_allows_any_time(): void + { + $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting( + [OrderService::class, NotificationService::class, CustomNotifier::class], + [new OrderService(), new NotificationService(), new CustomNotifier()], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('notifications', true), + ] + ); + + $ecotoneTestSupport->changeTimeTo(new \DateTimeImmutable('2020-01-01 12:00:00')); + + $this->assertEquals('2020-01-01 12:00:00', $ecotoneTestSupport->getServiceFromContainer(EcotoneClockInterface::class)->now()->format('Y-m-d H:i:s')); + } + + public function test_change_time_throws_exception_when_moving_backwards_after_first_setup(): void + { + $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting( + [OrderService::class, NotificationService::class, CustomNotifier::class], + [new OrderService(), new NotificationService(), new CustomNotifier()], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('notifications', true), + ] + ); + + $ecotoneTestSupport->changeTimeTo(new \DateTimeImmutable('2025-08-11 17:00:00')); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('Cannot move time backwards'); + + $ecotoneTestSupport->changeTimeTo(new \DateTimeImmutable('2025-08-11 16:30:00')); + } + + public function test_time_advances_before_change_time_is_called(): void + { + $clock = new StaticPsrClock(); + + $time1 = $clock->now(); + usleep(1000); + $time2 = $clock->now(); + + $this->assertGreaterThan($time1, $time2); + } + + public function test_time_advances_when_constructed_with_now_string(): void + { + $clock = new StaticPsrClock('now'); + + $time1 = $clock->now(); + usleep(1000); + $time2 = $clock->now(); + + $this->assertGreaterThan($time1, $time2); + } + + public function test_time_freezes_after_advance_time_with_duration(): void + { + $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting( + [OrderService::class, NotificationService::class, CustomNotifier::class], + [ClockInterface::class => new StaticPsrClock(), new OrderService(), new NotificationService(), new CustomNotifier()], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('notifications', true), + ] + ); + + $ecotoneTestSupport->advanceTimeTo(Duration::seconds(1)); + + $clock = $ecotoneTestSupport->getServiceFromContainer(ClockInterface::class); + $time1 = $clock->now(); + usleep(1000); + $time2 = $clock->now(); + + $this->assertEquals($time1, $time2); + } + + public function test_time_advances_when_constructed_with_null(): void + { + $clock = new StaticPsrClock(null); + + $time1 = $clock->now(); + usleep(1000); + $time2 = $clock->now(); + + $this->assertGreaterThan($time1, $time2); + } + + public function test_time_freezes_after_sleep_is_called_with_fixed_time(): void + { + $clock = new StaticPsrClock('2025-08-11 16:00:00'); + + $clock->sleep(Duration::seconds(1)); + + $time1 = $clock->now(); + usleep(1000); + $time2 = $clock->now(); + + $this->assertEquals($time1, $time2); + } + + public function test_time_continues_advancing_after_sleep_when_using_now(): void + { + $clock = new StaticPsrClock('now'); + + $clock->sleep(Duration::seconds(1)); + + $time1 = $clock->now(); + usleep(1000); + $time2 = $clock->now(); - $this->assertGreaterThan($time->getMicrosecond(), $nextMoment->getMicrosecond()); + $this->assertGreaterThan($time1, $time2); } } diff --git a/packages/Ecotone/tests/Messaging/Unit/Config/Annotation/ModuleConfiguration/EndpointHeadersInterceptorTest.php b/packages/Ecotone/tests/Messaging/Unit/Config/Annotation/ModuleConfiguration/EndpointHeadersInterceptorTest.php index 0473437e7..2ed5d71ec 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Config/Annotation/ModuleConfiguration/EndpointHeadersInterceptorTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Config/Annotation/ModuleConfiguration/EndpointHeadersInterceptorTest.php @@ -250,4 +250,56 @@ public function test_time_to_live_with_delayed_attribute() $this->assertEquals(1000, $headers[MessageHeaders::DELIVERY_DELAY]); $this->assertEquals($timeToLive, $headers[MessageHeaders::TIME_TO_LIVE]); } + + public function test_delayed_attribute_with_string_containing_utc_offset(): void + { + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [ + AddingMultipleHeaders::class, + ], + [ + AddingMultipleHeaders::class => new AddingMultipleHeaders(), + ], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + ], + testConfiguration: TestConfiguration::createWithDefaults()->withSpyOnChannel('async') + ); + + $command = new stdClass(); + $command->delay = '2030-01-01 12:00:00+02:00'; + $command->timeToLive = 1001; + + $headers = $ecotoneLite + ->sendCommandWithRoutingKey( + 'addHeadersWithExpression', + command: $command, + metadata: [ + 'token' => 123, + ] + ) + ->getRecordedEcotoneMessagesFrom('async')[0]->getHeaders()->headers(); + + $this->assertIsInt($headers[MessageHeaders::DELIVERY_DELAY]); + $this->assertGreaterThan(0, $headers[MessageHeaders::DELIVERY_DELAY]); + } + + public function test_throwing_exception_when_string_without_utc_offset_passed_to_delivery_delay(): void + { + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [AddingMultipleHeaders::class], + [AddingMultipleHeaders::class => new AddingMultipleHeaders()], + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('async'), + ], + testConfiguration: TestConfiguration::createWithDefaults()->withSpyOnChannel('async') + ); + + $command = new stdClass(); + $command->delay = '2025-01-01 12:00:00'; + + $this->expectException(ConfigurationException::class); + + $ecotoneLite->sendCommandWithRoutingKey('addHeadersWithStringDelayExpression', command: $command); + } }