Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ public function test_delaying_the_message_with_native_clock()
->build()
);

$ecotoneLite->waitTill(TimeSpan::withSeconds(1));
$ecotoneLite->advanceTimeTo(Duration::seconds(1));

$this->assertNull($messageChannel->receive());

$ecotoneLite->waitTill(TimeSpan::withSeconds(3));
$ecotoneLite->advanceTimeTo(Duration::seconds(3));

$this->assertNotNull($messageChannel->receive());
}
Expand Down Expand Up @@ -289,11 +289,11 @@ public function test_delaying_the_message_with_native_clock_using_date_time()

/** @var EcotoneClockInterface $clock */
$clock = $ecotoneLite->getServiceFromContainer(EcotoneClockInterface::class);
$ecotoneLite->waitTill($clock->now()->add(Duration::seconds(1)));
$ecotoneLite->changeTimeTo($clock->now()->add(Duration::seconds(1)));

$this->assertNull($messageChannel->receive());

$ecotoneLite->waitTill($clock->now()->add(Duration::seconds(3)));
$ecotoneLite->changeTimeTo($clock->now()->add(Duration::seconds(3)));

$this->assertNotNull($messageChannel->receive());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Ecotone\Lite\Test;

use DateTimeImmutable;
use Ecotone\Messaging\Config\ConfiguredMessagingSystem;
use Ecotone\Messaging\Config\Container\GatewayProxyMethodReference;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
Expand All @@ -12,12 +13,16 @@
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\MessagePublisher;
use Ecotone\Messaging\Scheduling\Duration;
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\DistributedBus;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\QueryBus;
use Ecotone\Test\StaticPsrClock;
use InvalidArgumentException;
use Psr\Clock\ClockInterface;

/**
* licence Apache-2.0
Expand Down Expand Up @@ -129,4 +134,51 @@ public function replaceWith(ConfiguredMessagingSystem $messagingSystem): void
{
$this->configuredMessagingSystem->replaceWith($messagingSystem);
}

public function changeTime(DateTimeImmutable|Duration $time): self
{
$psrClock = $this->getStaticPsrClockFromContainer();

if ($time instanceof Duration) {
$psrClock->setCurrentTime(
DateTimeImmutable::createFromInterface($psrClock->now())->modify("+{$time->inMicroseconds()} microseconds")
);
return $this;
}

if ($psrClock->hasBeenChanged() && $time <= $psrClock->now()) {
throw new InvalidArgumentException(
sprintf(
'Cannot move time backwards. Current clock time: %s, requested time: %s',
$psrClock->now()->format('Y-m-d H:i:s.u'),
$time->format('Y-m-d H:i:s.u')
)
);
}

$psrClock->setCurrentTime($time);

return $this;
}

private function getStaticPsrClockFromContainer(): StaticPsrClock
{
try {
$psrClock = $this->configuredMessagingSystem->getServiceFromContainer(ClockInterface::class);
} catch (\Throwable) {
throw new InvalidArgumentException(
'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' .
'Register ClockInterface::class => new StaticPsrClock() in your container services.'
);
}

if (! $psrClock instanceof StaticPsrClock) {
throw new InvalidArgumentException(
'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' .
'Register ClockInterface::class => new StaticPsrClock() in your container services.'
);
}

return $psrClock;
}
}
64 changes: 55 additions & 9 deletions packages/Ecotone/src/Lite/Test/FlowTestSupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Ecotone\Lite\Test;

use DateTimeImmutable;
use DateTimeInterface;
use Ecotone\EventSourcing\EventStore;
use Ecotone\EventSourcing\ProjectionManager;
Expand All @@ -18,6 +19,7 @@
use Ecotone\Messaging\MessagingException;
use Ecotone\Messaging\PollableChannel;
use Ecotone\Messaging\Scheduling\Clock;
use Ecotone\Messaging\Scheduling\Duration;
use Ecotone\Messaging\Scheduling\EcotoneClockInterface;
use Ecotone\Messaging\Scheduling\TimeSpan;
use Ecotone\Messaging\Support\Assert;
Expand All @@ -32,6 +34,8 @@
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\QueryBus;
use Ecotone\Projecting\ProjectionRegistry;
use Ecotone\Test\StaticPsrClock;
use InvalidArgumentException;

/**
* @template T
Expand Down Expand Up @@ -191,23 +195,65 @@ public function getEventStreamEvents(string $streamName): array
return $this->getGateway(EventStore::class)->load($streamName);
}

public function waitTill(TimeSpan|DateTimeInterface $time): self
public function changeTimeTo(DateTimeImmutable $time): self
{
if ($time instanceof DateTimeInterface) {
if ($time < $this->clock->now()) {
throw new MessagingException("Time to wait is in the past. Now: {$this->clock->now()}, time to wait: {$time}");
}
$psrClock = $this->getStaticPsrClockFromContainer();

if ($psrClock->hasBeenChanged() && $time <= $psrClock->now()) {
throw new InvalidArgumentException(
\sprintf(
'Cannot move time backwards. Current clock time: %s, requested time: %s',
$psrClock->now()->format('Y-m-d H:i:s.u'),
$time->format('Y-m-d H:i:s.u')
)
);
}

$this->clock->sleep(
$time instanceof TimeSpan
? $time->toDuration()
: TimeSpan::fromDateInterval($time->diff($this->clock->now()))->toDuration()
$psrClock->setCurrentTime($time);

return $this;
}

public function advanceTimeTo(Duration $duration): self
{
$psrClock = $this->getStaticPsrClockFromContainer();
$psrClock->setCurrentTime(
DateTimeImmutable::createFromInterface($psrClock->now())->modify("+{$duration->inMicroseconds()} microseconds")
);

return $this;
}

private function getStaticPsrClockFromContainer(): StaticPsrClock
{
try {
/** @var Clock $clock */
$clock = $this->configuredMessagingSystem->getServiceFromContainer(EcotoneClockInterface::class);
} catch (\Throwable) {
throw new InvalidArgumentException(
'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' .
'Register ClockInterface::class => new StaticPsrClock() in your container services.'
);
}

if (! $clock instanceof Clock) {
throw new InvalidArgumentException(
'Changing time is only possible when using Clock as the EcotoneClockInterface. ' .
'Register EcotoneClockInterface::class => new Clock(new StaticPsrClock()) in your container services.'
);
}

$clock = $clock->internalClock();
if (! $clock instanceof StaticPsrClock) {
throw new InvalidArgumentException(
'Changing time is only possible when using StaticPsrClock as the ClockInterface. ' .
'Register ClockInterface::class => new StaticPsrClock() in your container services.'
);
}

return $clock;
}

/**
* @param Event[]|object[]|array[] $events
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

namespace Ecotone\Messaging\Config\Annotation\ModuleConfiguration\EndpointHeaders;

use DateTimeImmutable;
use DateTimeInterface;
use Ecotone\Messaging\Attribute\Endpoint\AddHeader;
use Ecotone\Messaging\Attribute\Endpoint\Delayed;

use function is_string;
use function preg_match;
use function str_ends_with;
use Ecotone\Messaging\Attribute\Endpoint\Priority;
use Ecotone\Messaging\Attribute\Endpoint\RemoveHeader;
use Ecotone\Messaging\Attribute\Endpoint\TimeToLive;
Expand Down Expand Up @@ -62,6 +67,10 @@ public function addMetadata(Message $message, ?AddHeader $addHeader, ?Delayed $d
]);
}

if (is_string($metadata[MessageHeaders::DELIVERY_DELAY])) {
$metadata[MessageHeaders::DELIVERY_DELAY] = $this->parseDateTimeStringWithRequiredOffset($metadata[MessageHeaders::DELIVERY_DELAY]);
}

$type = Type::createFromVariable($metadata[MessageHeaders::DELIVERY_DELAY]);
if (! $type->isCompatibleWith(UnionType::createWith([
Type::int(),
Expand Down Expand Up @@ -120,4 +129,20 @@ public function getDefinition(): Definition
Reference::to(ExpressionEvaluationService::REFERENCE),
]);
}

private function parseDateTimeStringWithRequiredOffset(string $dateTimeString): DateTimeImmutable
{
if (! $this->hasUtcOffset($dateTimeString)) {
throw ConfigurationException::create("Delivery delay string '{$dateTimeString}' must contain a UTC offset (e.g., '+02:00' or 'Z'). Dates without timezone information are ambiguous.");
}

return new DateTimeImmutable($dateTimeString);
}

private function hasUtcOffset(string $dateTimeString): bool
{
return preg_match('/[+-]\d{2}:\d{2}$/', $dateTimeString) === 1
|| preg_match('/[+-]\d{4}$/', $dateTimeString) === 1
|| str_ends_with($dateTimeString, 'Z');
}
}
5 changes: 5 additions & 0 deletions packages/Ecotone/src/Messaging/Scheduling/Clock.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public static function get(): EcotoneClockInterface
return self::$globalClock ?? new self(self::defaultClock());
}

public function internalClock(): PsrClockInterface
{
return $this->clock;
}

public function now(): DatePoint
{
$now = $this->clock->now();
Expand Down
36 changes: 30 additions & 6 deletions packages/Ecotone/src/Test/StaticPsrClock.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,46 @@
*/
final class StaticPsrClock implements ClockInterface, SleepInterface
{
private Duration $sleepDuration;
private bool $hasBeenChanged = false;
private ?DateTimeImmutable $now = null;

public function __construct(private ?string $now = null)
public function __construct(?string $now = null)
{
$this->sleepDuration = Duration::zero();
$this->now = ($now === null || $now === 'now') ? null : new DateTimeImmutable($now);
}

public function now(): DateTimeImmutable
{
$now = $this->now === null ? new DateTimeImmutable() : new DateTimeImmutable($this->now);
if ($this->now !== null) {
return $this->now;
}

return $now->modify("+{$this->sleepDuration->zeroIfNegative()->inMicroseconds()} microseconds");
return new DateTimeImmutable('now');
}

public function sleep(Duration $duration): void
{
$this->sleepDuration = $this->sleepDuration->add($duration);
if ($duration->isNegativeOrZero()) {
return;
}

if ($this->now === null) {

usleep($duration->inMicroseconds());
return;
}

$this->now = $this->now()->modify("+{$duration->inMicroseconds()} microseconds");
}

public function hasBeenChanged(): bool
{
return $this->hasBeenChanged;
}

public function setCurrentTime(DateTimeImmutable $time): void
{
$this->now = $time;
$this->hasBeenChanged = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,12 @@ public function testKeepTtlHeader(): void
{

}

#[Delayed(expression: 'payload.delay')]
#[Asynchronous('async')]
#[CommandHandler('addHeadersWithStringDelayExpression', endpointId: 'addHeadersWithStringDelayExpressionEndpoint')]
public function withStringDelayExpression(): void
{

}
}
Loading