Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 147 additions & 8 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,11 @@ final class DoStuffSubscriber
}
}
```

### Setup and Teardown

Subscribers can have one `setup` and `teardown` method that is executed when the subscription is created or deleted.
For this there are the attributes `Setup` and `Teardown`. The method name itself doesn't matter.
Subscribers can have one `setup` method that is executed when the subscription is created.
For this there is the attributes `Setup`. The method name itself doesn't matter.
This is especially helpful for projectors, as they can create the necessary structures for the projection here.

```php
Expand All @@ -319,12 +320,6 @@ final class ProfileProjector
sprintf('CREATE TABLE IF NOT EXISTS %s (id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL);', self::TABLE),
);
}

#[Teardown]
public function drop(): void
{
$this->connection->executeStatement(sprintf('DROP TABLE IF EXISTS %s;', self::TABLE));
}
}
```
!!! danger
Expand All @@ -344,6 +339,83 @@ final class ProfileProjector
Most databases have a limit on the length of the table/collection name.
The limit is usually 64 characters.

### Teardown

Subscribers can have one `teardown` method that is executed when the subscription is removed.
For this there is the attributes `Teardown`.

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Teardown;

#[Projector(self::TABLE)]
final class ProfileProjector
{
private const TABLE = 'profile_v1';

private Connection $connection;

#[Teardown]
public function drop(): void
{
$this->connection->executeStatement(sprintf('DROP TABLE IF EXISTS %s;', self::TABLE));
}
}
```
!!! danger

MySQL and MariaDB don't support transactions for DDL statements.
So you must use a different database connection in your projectors,
otherwise you will get an error when the subscription tries to create the table.

!!! warning

A teardown can only be performed for a subscription if the subscriber with that subscriber ID still exists.

!!! note

You can not mix the `cleanup` method with the `teardown` method.

### Cleanup

The cleanup option allows you to delete the subscription from the database after the subscription has finished.
This is especially useful for projectors,

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Cleanup;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DropIndexTask;

#[Projector(self::TABLE)]
final class ProfileProjector
{
private const TABLE = 'profile_v1';

private Connection $connection;

#[Cleanup]
public function drop(): array
{
return [
new DropIndexTask(self::TABLE)
];
}
}
```

!!! note

You can not mix the `cleanup` method with the `teardown` method.

!!! tip

You can create your own cleanup tasks and add them to the array.
For more information, see [Cleanup Handler](#cleanup-handler).

### On Failed

The subscription engine has a [retry strategy](#retry-strategy) to retry subscriptions that have an error.
Expand Down Expand Up @@ -918,7 +990,63 @@ $retryStrategyRepository = new RetryStrategyRepository([
!!! tip

You can change the default retry strategy by define the name in the constructor as second parameter.

### Cleanup Handler

You can also create your own cleanup tasks.
The subscription engine will call the method `__invoke` on the task object.
For example, you can create a task that drops a collection from a MongoDB database.

First, create a task class, that holds the collection name.

```php
final class DropCollection {
public function __construct(
public readonly string $collectionName
) {}
}
```

!!! warning

The task class must be serializable. It will be stored in the subscription store.

Then create a handler that supports this task.

```php
use MongoDb\Database;
use Patchlevel\EventSourcing\Subscription\Cleanup\CleanupHandler;

final class MongodbCleanupHandler implements CleanupHandler {
public function __construct(
private readonly Database $database
) {}

public function __invoke(object $task): void
{
if ($task instanceof DropCollection) {
$this->database->dropCollection($task->collectionName);
}
}

public function supports(object $task): bool
{
return $task instanceof DropCollection;
}
}
```

Last step, you need to add the handler to the `DefaultCleaner`, that is used by the subscription engine.

```php
use Patchlevel\EventSourcing\Subscription\Cleanup\DefaultCleaner;

$cleaner = new DefaultCleaner([
new DbalCleanupHandler($projectionConnection),
new MongodbCleanupHandler($mongodbDatabase),
]);
```

### Subscriber Accessor

The subscriber accessor repository is responsible for providing the subscribers to the subscription engine.
Expand All @@ -938,30 +1066,41 @@ $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([
$subscriber3,
]);
```

### Subscription Engine

Now we can create the subscription engine and plug together the necessary services.
The message loader is needed to load the messages, the Subscription Store to store the subscription state
and we need the subscriber accessor repository. Optionally, we can also pass a retry strategy.
Finally, if we want to use the cleanup feature, we need to pass the cleanup handlers.

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategyRepository;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DbalCleanupHandler;
use Patchlevel\EventSourcing\Subscription\Cleanup\DefaultCleaner;

/**
* @var MessageLoader $messageLoader
* @var DoctrineSubscriptionStore $subscriptionStore
* @var MetadataSubscriberAccessorRepository $subscriberAccessorRepository
* @var RetryStrategyRepository $retryStrategyRepository
* @var LoggerInterface $logger
* @var Connection $projectionConnection
*/
$subscriptionEngine = new DefaultSubscriptionEngine(
$messageLoader,
$subscriptionStore,
$subscriberAccessorRepository,
$retryStrategyRepository, // optional, if not set the default retry strategy is used
$logger, // optional
new DefaultCleaner([
new DbalCleanupHandler($projectionConnection)
]), // required, if you want to use the cleanup feature
);
```
### Catch up Subscription Engine
Expand Down
12 changes: 6 additions & 6 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,18 @@ parameters:
count: 1
path: src/Subscription/Store/DoctrineSubscriptionStore.php

-
message: '#^Parameter \#9 \$cleanupTasks of class Patchlevel\\EventSourcing\\Subscription\\Subscription constructor expects list\<object\>\|null, mixed given\.$#'
identifier: argument.type
count: 1
path: src/Subscription/Store/DoctrineSubscriptionStore.php

-
message: '#^Cannot cast mixed to string\.$#'
identifier: cast.string
count: 3
path: src/Subscription/ThrowableToErrorContextTransformer.php

-
message: '#^Property Patchlevel\\EventSourcing\\Tests\\Benchmark\\BasicImplementation\\ProfileWithCommands\:\:\$id is never read, only written\.$#'
identifier: property.onlyWritten
count: 1
path: tests/Benchmark/BasicImplementation/ProfileWithCommands.php

-
message: '#^Cannot access offset ''name'' on array\<string, mixed\>\|false\.$#'
identifier: offsetAccess.nonOffsetAccessible
Expand Down
12 changes: 12 additions & 0 deletions src/Attribute/Cleanup.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_METHOD)]
final class Cleanup
{
}
31 changes: 31 additions & 0 deletions src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Patchlevel\EventSourcing\Metadata\Subscriber;

use Patchlevel\EventSourcing\Attribute\Cleanup;
use Patchlevel\EventSourcing\Attribute\OnFailed;
use Patchlevel\EventSourcing\Attribute\RetryStrategy;
use Patchlevel\EventSourcing\Attribute\Setup;
Expand Down Expand Up @@ -45,6 +46,7 @@ public function metadata(string $subscriber): SubscriberMetadata
$subscribeMethods = [];
$setupMethod = null;
$teardownMethod = null;
$cleanupMethod = null;
$failedMethod = null;

foreach ($methods as $method) {
Expand Down Expand Up @@ -90,6 +92,26 @@ public function metadata(string $subscriber): SubscriberMetadata
$setupMethod = $method->getName();
}

if ($method->getAttributes(Cleanup::class)) {
if ($cleanupMethod !== null) {
throw new DuplicateCleanupMethod(
$subscriber,
$cleanupMethod,
$method->getName(),
);
}

if ($teardownMethod !== null) {
throw new MixedTeardownAndCleanupMethods(
$subscriber,
$teardownMethod,
$method->getName(),
);
}

$cleanupMethod = $method->getName();
}

if (!$method->getAttributes(Teardown::class)) {
continue;
}
Expand All @@ -102,6 +124,14 @@ public function metadata(string $subscriber): SubscriberMetadata
);
}

if ($cleanupMethod !== null) {
throw new MixedTeardownAndCleanupMethods(
$subscriber,
$method->getName(),
$cleanupMethod,
);
}

$teardownMethod = $method->getName();
}

Expand All @@ -118,6 +148,7 @@ public function metadata(string $subscriber): SubscriberMetadata
$teardownMethod,
$failedMethod,
$this->retryStrategy($reflector),
$cleanupMethod,
);

$this->subscriberMetadata[$subscriber] = $metadata;
Expand Down
25 changes: 25 additions & 0 deletions src/Metadata/Subscriber/DuplicateCleanupMethod.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Metadata\Subscriber;

use Patchlevel\EventSourcing\Metadata\MetadataException;

use function sprintf;

final class DuplicateCleanupMethod extends MetadataException
{
/** @param class-string $subscriber */
public function __construct(string $subscriber, string $fistMethod, string $secondMethod)
{
parent::__construct(
sprintf(
'Two methods "%s" and "%s" on the subscriber "%s" have been marked as "cleanup" methods. Only one method can be defined like this.',
$fistMethod,
$secondMethod,
$subscriber,
),
);
}
}
27 changes: 27 additions & 0 deletions src/Metadata/Subscriber/MixedTeardownAndCleanupMethods.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Metadata\Subscriber;

use Patchlevel\EventSourcing\Metadata\MetadataException;

use function sprintf;

final class MixedTeardownAndCleanupMethods extends MetadataException
{
public function __construct(
string $subscriber,
string $teardownMethod,
string $cleanupMethod,
) {
parent::__construct(
sprintf(
'The subscriber "%s" has a "teardown" method "%s" and a "cleanup" method "%s". Only one of them can be defined.',
$subscriber,
$teardownMethod,
$cleanupMethod,
),
);
}
}
1 change: 1 addition & 0 deletions src/Metadata/Subscriber/SubscriberMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public function __construct(
public readonly string|null $teardownMethod = null,
public readonly string|null $failedMethod = null,
public readonly string|null $retryStrategy = null,
public readonly string|null $cleanupMethod = null,
) {
}
}
12 changes: 12 additions & 0 deletions src/Subscription/Cleanup/Cleaner.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Cleanup;

use Patchlevel\EventSourcing\Subscription\Subscription;

interface Cleaner
{
public function cleanup(Subscription $subscription): void;
}
Loading
Loading