diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index 19e5a9808..e2904b281 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -293,10 +293,11 @@ final class DoStuffSubscriber } } ``` + ### Setup and Teardown -Subscribers can have one `setup` and `teardown` method that is executed when the subscription is created or deleted. -For this there are the attributes `Setup` and `Teardown`. The method name itself doesn't matter. +Subscribers can have one `setup` method that is executed when the subscription is created. +For this there is the attributes `Setup`. The method name itself doesn't matter. This is especially helpful for projectors, as they can create the necessary structures for the projection here. ```php @@ -319,12 +320,6 @@ final class ProfileProjector sprintf('CREATE TABLE IF NOT EXISTS %s (id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL);', self::TABLE), ); } - - #[Teardown] - public function drop(): void - { - $this->connection->executeStatement(sprintf('DROP TABLE IF EXISTS %s;', self::TABLE)); - } } ``` !!! danger @@ -344,6 +339,83 @@ final class ProfileProjector Most databases have a limit on the length of the table/collection name. The limit is usually 64 characters. +### Teardown + +Subscribers can have one `teardown` method that is executed when the subscription is removed. +For this there is the attributes `Teardown`. + +```php +use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\Attribute\Projector; +use Patchlevel\EventSourcing\Attribute\Setup; +use Patchlevel\EventSourcing\Attribute\Teardown; + +#[Projector(self::TABLE)] +final class ProfileProjector +{ + private const TABLE = 'profile_v1'; + + private Connection $connection; + + #[Teardown] + public function drop(): void + { + $this->connection->executeStatement(sprintf('DROP TABLE IF EXISTS %s;', self::TABLE)); + } +} +``` +!!! danger + + MySQL and MariaDB don't support transactions for DDL statements. + So you must use a different database connection in your projectors, + otherwise you will get an error when the subscription tries to create the table. + +!!! warning + + A teardown can only be performed for a subscription if the subscriber with that subscriber ID still exists. + +!!! note + + You can not mix the `cleanup` method with the `teardown` method. + +### Cleanup + +The cleanup option allows you to delete the subscription from the database after the subscription has finished. +This is especially useful for projectors, + +```php +use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\Attribute\Cleanup; +use Patchlevel\EventSourcing\Attribute\Projector; +use Patchlevel\EventSourcing\Attribute\Setup; +use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DropIndexTask; + +#[Projector(self::TABLE)] +final class ProfileProjector +{ + private const TABLE = 'profile_v1'; + + private Connection $connection; + + #[Cleanup] + public function drop(): array + { + return [ + new DropIndexTask(self::TABLE) + ]; + } +} +``` + +!!! note + + You can not mix the `cleanup` method with the `teardown` method. + +!!! tip + + You can create your own cleanup tasks and add them to the array. + For more information, see [Cleanup Handler](#cleanup-handler). + ### On Failed The subscription engine has a [retry strategy](#retry-strategy) to retry subscriptions that have an error. @@ -918,7 +990,63 @@ $retryStrategyRepository = new RetryStrategyRepository([ !!! tip You can change the default retry strategy by define the name in the constructor as second parameter. + +### Cleanup Handler + +You can also create your own cleanup tasks. +The subscription engine will call the method `__invoke` on the task object. +For example, you can create a task that drops a collection from a MongoDB database. + +First, create a task class, that holds the collection name. + +```php +final class DropCollection { + public function __construct( + public readonly string $collectionName + ) {} +} +``` + +!!! warning + + The task class must be serializable. It will be stored in the subscription store. + +Then create a handler that supports this task. + +```php +use MongoDb\Database; +use Patchlevel\EventSourcing\Subscription\Cleanup\CleanupHandler; + +final class MongodbCleanupHandler implements CleanupHandler { + public function __construct( + private readonly Database $database + ) {} + + public function __invoke(object $task): void + { + if ($task instanceof DropCollection) { + $this->database->dropCollection($task->collectionName); + } + } + public function supports(object $task): bool + { + return $task instanceof DropCollection; + } +} +``` + +Last step, you need to add the handler to the `DefaultCleaner`, that is used by the subscription engine. + +```php +use Patchlevel\EventSourcing\Subscription\Cleanup\DefaultCleaner; + +$cleaner = new DefaultCleaner([ + new DbalCleanupHandler($projectionConnection), + new MongodbCleanupHandler($mongodbDatabase), +]); +``` + ### Subscriber Accessor The subscriber accessor repository is responsible for providing the subscribers to the subscription engine. @@ -938,30 +1066,41 @@ $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([ $subscriber3, ]); ``` + ### Subscription Engine Now we can create the subscription engine and plug together the necessary services. The message loader is needed to load the messages, the Subscription Store to store the subscription state and we need the subscriber accessor repository. Optionally, we can also pass a retry strategy. +Finally, if we want to use the cleanup feature, we need to pass the cleanup handlers. ```php +use Doctrine\DBAL\Connection; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader; use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategyRepository; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; +use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DbalCleanupHandler; +use Patchlevel\EventSourcing\Subscription\Cleanup\DefaultCleaner; /** * @var MessageLoader $messageLoader * @var DoctrineSubscriptionStore $subscriptionStore * @var MetadataSubscriberAccessorRepository $subscriberAccessorRepository * @var RetryStrategyRepository $retryStrategyRepository + * @var LoggerInterface $logger + * @var Connection $projectionConnection */ $subscriptionEngine = new DefaultSubscriptionEngine( $messageLoader, $subscriptionStore, $subscriberAccessorRepository, $retryStrategyRepository, // optional, if not set the default retry strategy is used + $logger, // optional + new DefaultCleaner([ + new DbalCleanupHandler($projectionConnection) + ]), // required, if you want to use the cleanup feature ); ``` ### Catch up Subscription Engine diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 2ee97190e..cd4d7fbe2 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -156,18 +156,18 @@ parameters: count: 1 path: src/Subscription/Store/DoctrineSubscriptionStore.php + - + message: '#^Parameter \#9 \$cleanupTasks of class Patchlevel\\EventSourcing\\Subscription\\Subscription constructor expects list\\|null, mixed given\.$#' + identifier: argument.type + count: 1 + path: src/Subscription/Store/DoctrineSubscriptionStore.php + - message: '#^Cannot cast mixed to string\.$#' identifier: cast.string count: 3 path: src/Subscription/ThrowableToErrorContextTransformer.php - - - message: '#^Property Patchlevel\\EventSourcing\\Tests\\Benchmark\\BasicImplementation\\ProfileWithCommands\:\:\$id is never read, only written\.$#' - identifier: property.onlyWritten - count: 1 - path: tests/Benchmark/BasicImplementation/ProfileWithCommands.php - - message: '#^Cannot access offset ''name'' on array\\|false\.$#' identifier: offsetAccess.nonOffsetAccessible diff --git a/src/Attribute/Cleanup.php b/src/Attribute/Cleanup.php new file mode 100644 index 000000000..039c9b738 --- /dev/null +++ b/src/Attribute/Cleanup.php @@ -0,0 +1,12 @@ +getName(); } + if ($method->getAttributes(Cleanup::class)) { + if ($cleanupMethod !== null) { + throw new DuplicateCleanupMethod( + $subscriber, + $cleanupMethod, + $method->getName(), + ); + } + + if ($teardownMethod !== null) { + throw new MixedTeardownAndCleanupMethods( + $subscriber, + $teardownMethod, + $method->getName(), + ); + } + + $cleanupMethod = $method->getName(); + } + if (!$method->getAttributes(Teardown::class)) { continue; } @@ -102,6 +124,14 @@ public function metadata(string $subscriber): SubscriberMetadata ); } + if ($cleanupMethod !== null) { + throw new MixedTeardownAndCleanupMethods( + $subscriber, + $method->getName(), + $cleanupMethod, + ); + } + $teardownMethod = $method->getName(); } @@ -118,6 +148,7 @@ public function metadata(string $subscriber): SubscriberMetadata $teardownMethod, $failedMethod, $this->retryStrategy($reflector), + $cleanupMethod, ); $this->subscriberMetadata[$subscriber] = $metadata; diff --git a/src/Metadata/Subscriber/DuplicateCleanupMethod.php b/src/Metadata/Subscriber/DuplicateCleanupMethod.php new file mode 100644 index 000000000..cb508d246 --- /dev/null +++ b/src/Metadata/Subscriber/DuplicateCleanupMethod.php @@ -0,0 +1,25 @@ +connection->createSchemaManager()->dropTable($task->table); + break; + case DropIndexTask::class: + $this->connection->createSchemaManager()->dropIndex($task->index, $task->table); + break; + } + } + + public function supports(object $task): bool + { + return $task instanceof DropTableTask || $task instanceof DropIndexTask; + } +} diff --git a/src/Subscription/Cleanup/Dbal/DropIndexTask.php b/src/Subscription/Cleanup/Dbal/DropIndexTask.php new file mode 100644 index 000000000..227f2c558 --- /dev/null +++ b/src/Subscription/Cleanup/Dbal/DropIndexTask.php @@ -0,0 +1,14 @@ + $handlers */ + public function __construct( + private readonly iterable $handlers, + ) { + } + + public function cleanup(Subscription $subscription): void + { + $tasks = $subscription->cleanupTasks(); + + if (!$tasks) { + return; + } + + foreach ($tasks as $task) { + foreach ($this->handlers as $handler) { + if (!$handler->supports($task)) { + continue; + } + + try { + $handler($task); + } catch (Throwable $exception) { + throw new CleanupFailed( + $subscription->id(), + $task, + $handler::class, + $exception, + ); + } + + continue 2; + } + + throw new NoHandlerForCleanupTask($task); + } + } +} diff --git a/src/Subscription/Cleanup/NoHandlerForCleanupTask.php b/src/Subscription/Cleanup/NoHandlerForCleanupTask.php new file mode 100644 index 000000000..4abb82c22 --- /dev/null +++ b/src/Subscription/Cleanup/NoHandlerForCleanupTask.php @@ -0,0 +1,18 @@ +messageLoader = $messageStore; @@ -528,12 +530,22 @@ function (SubscriptionCollection $subscriptions): Result { $errors = []; foreach ($subscriptions as $subscription) { + if ($subscription->hasCleanupTasks()) { + $error = $this->cleanup($subscription); + + if ($error) { + $errors[] = $error; + } + + continue; + } + $subscriber = $this->subscriber($subscription->id()); if (!$subscriber) { $this->logger?->warning( sprintf( - 'Subscription Engine: Subscriber for "%s" to teardown not found, skipped.', + 'Subscription Engine: Subscriber for "%s" to teardown or cleanup not found, skipped.', $subscription->id(), ), ); @@ -630,6 +642,16 @@ function (SubscriptionCollection $subscriptions): Result { continue; } + if ($subscription->hasCleanupTasks()) { + $error = $this->cleanup($subscription); + + if ($error) { + $errors[] = $error; + } + + continue; + } + $subscriber = $this->subscriber($subscription->id()); if (!$subscriber) { @@ -977,6 +999,7 @@ private function discoverNewSubscriptions(): void $subscriber->id(), $subscriber->group(), $subscriber->runMode(), + cleanupTasks: $this->cleanupTasks($subscriber), ); if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) { @@ -1001,8 +1024,12 @@ private function discoverNewSubscriptions(): void $this->subscriptionManager->flush(); } - private function handleError(Subscription $subscription, Throwable $throwable, Message|null $message = null, int|null $index = null): void - { + private function handleError( + Subscription $subscription, + Throwable $throwable, + Message|null $message = null, + int|null $index = null, + ): void { if ($this->needRollback($subscription)) { $this->rollback($subscription); } @@ -1019,8 +1046,12 @@ private function handleError(Subscription $subscription, Throwable $throwable, M $this->handleFailed($subscription, $throwable, $message, $index); } - private function handleFailed(Subscription $subscription, Throwable $throwable, Message|null $message = null, int|null $index = null): void - { + private function handleFailed( + Subscription $subscription, + Throwable $throwable, + Message|null $message = null, + int|null $index = null, + ): void { if (!$message || $index === null) { $subscription->failed($throwable); $this->subscriptionManager->update($subscription); @@ -1210,4 +1241,60 @@ private function retryStrategy(Subscription $subscription): RetryStrategy return $this->retryStrategyRepository->get($retryStrategy); } + + private function cleanup(Subscription $subscription): Error|null + { + if (!$this->cleaner) { + throw new UnexpectedError('Cleaner is not configured.'); + } + + try { + $this->cleaner->cleanup($subscription); + $this->logger?->debug( + sprintf( + 'Subscription Engine: For Subscription "%s" the cleanup tasks have been executed.', + $subscription->id(), + ), + ); + } catch (Throwable $e) { + $this->logger?->error( + sprintf( + 'Subscription Engine: Subscription "%s" has an error in the cleanup tasks: %s', + $subscription->id(), + $e->getMessage(), + ), + ); + + return new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); + } + + $this->subscriptionManager->remove($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" removed.', + $subscription->id(), + )); + + return null; + } + + /** @return list|null */ + private function cleanupTasks(SubscriberAccessor $subscriber): array|null + { + if (!$subscriber instanceof MetadataSubscriberAccessor) { + return null; + } + + $method = $subscriber->cleanupMethod(); + + if (!$method) { + return null; + } + + return array_values([...$method()]); + } } diff --git a/src/Subscription/Store/DoctrineSubscriptionStore.php b/src/Subscription/Store/DoctrineSubscriptionStore.php index f81004852..34e86ae09 100644 --- a/src/Subscription/Store/DoctrineSubscriptionStore.php +++ b/src/Subscription/Store/DoctrineSubscriptionStore.php @@ -27,6 +27,8 @@ use function assert; use function json_decode; use function json_encode; +use function serialize; +use function unserialize; use const JSON_THROW_ON_ERROR; @@ -41,6 +43,7 @@ * error_context: string|null, * retry_attempt: int, * last_saved_at: string, + * cleanup_tasks: string|null, * } */ final class DoctrineSubscriptionStore implements LockableSubscriptionStore, DoctrineSchemaConfigurator @@ -139,6 +142,7 @@ public function add(Subscription $subscription): void 'error_context' => $subscriptionError?->errorContext !== null ? json_encode($subscriptionError->errorContext, JSON_THROW_ON_ERROR) : null, 'retry_attempt' => $subscription->retryAttempt(), 'last_saved_at' => $subscription->lastSavedAt(), + 'cleanup_tasks' => $subscription->cleanupTasks() !== null ? serialize($subscription->cleanupTasks()) : null, ], [ 'last_saved_at' => Types::DATETIME_IMMUTABLE, @@ -164,6 +168,7 @@ public function update(Subscription $subscription): void 'error_context' => $subscriptionError?->errorContext !== null ? json_encode($subscriptionError->errorContext, JSON_THROW_ON_ERROR) : null, 'retry_attempt' => $subscription->retryAttempt(), 'last_saved_at' => $subscription->lastSavedAt(), + 'cleanup_tasks' => $subscription->cleanupTasks() !== null ? serialize($subscription->cleanupTasks()) : null, ], [ 'id' => $subscription->id(), @@ -240,6 +245,8 @@ public function configureSchema(Schema $schema, Connection $connection): void ->setNotnull(true); $table->addColumn('last_saved_at', Types::DATETIMETZ_IMMUTABLE) ->setNotnull(true); + $table->addColumn('cleanup_tasks', Types::TEXT) + ->setNotnull(false); $table->setPrimaryKey(['id']); $table->addIndex(['group_name']); @@ -265,6 +272,7 @@ private function createSubscription(array $row): Subscription ) : null, $row['retry_attempt'], self::normalizeDateTime($row['last_saved_at'], $this->connection->getDatabasePlatform()), + $row['cleanup_tasks'] !== null ? unserialize($row['cleanup_tasks']) : null, ); } diff --git a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php index 4435a3699..0090c36f3 100644 --- a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php +++ b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php @@ -86,6 +86,18 @@ public function teardownMethod(): Closure|null return $this->subscriber->$method(...); } + /** @return Closure():iterable|null */ + public function cleanupMethod(): Closure|null + { + $method = $this->metadata->cleanupMethod; + + if ($method === null) { + return null; + } + + return $this->subscriber->$method(...); + } + /** @return Closure(Message, Throwable):void|null */ public function failedMethod(): Closure|null { diff --git a/src/Subscription/Subscription.php b/src/Subscription/Subscription.php index 5246dc90d..c2e1d56b4 100644 --- a/src/Subscription/Subscription.php +++ b/src/Subscription/Subscription.php @@ -11,6 +11,7 @@ final class Subscription { public const DEFAULT_GROUP = 'default'; + /** @param list|null $cleanupTasks */ public function __construct( private readonly string $id, private readonly string $group = self::DEFAULT_GROUP, @@ -20,6 +21,7 @@ public function __construct( private SubscriptionError|null $error = null, private int $retryAttempt = 0, private DateTimeImmutable|null $lastSavedAt = null, + private array|null $cleanupTasks = null, ) { } @@ -190,4 +192,15 @@ public function updateLastSavedAt(DateTimeImmutable $lastSavedAt): void { $this->lastSavedAt = $lastSavedAt; } + + /** @return list|null */ + public function cleanupTasks(): array|null + { + return $this->cleanupTasks; + } + + public function hasCleanupTasks(): bool + { + return $this->cleanupTasks !== null; + } } diff --git a/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php b/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php new file mode 100644 index 000000000..4aeed5d7e --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php @@ -0,0 +1,81 @@ +tableName()); + $table->addColumn('id', 'string')->setLength(36); + $table->addColumn('name', 'string')->setLength(255); + $table->setPrimaryKey(['id']); + + $this->connection->createSchemaManager()->createTable($table); + } + + #[Cleanup] + public function drop(): Generator + { + yield new DropTableTask($this->tableName()); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->connection->executeStatement( + 'INSERT INTO ' . $this->tableName() . ' (id, name) VALUES(:id, :name);', + [ + 'id' => $profileCreated->profileId->toString(), + 'name' => $profileCreated->name, + ], + ); + } + + private function tableName(): string + { + return 'projection_' . self::TABLE_NAME; + } + + public function beginBatch(): void + { + $this->connection->beginTransaction(); + } + + public function commitBatch(): void + { + $this->connection->commit(); + } + + public function rollbackBatch(): void + { + $this->connection->rollBack(); + } + + public function forceCommit(): bool + { + return false; + } +} diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 9da41172f..0f5503207 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -21,6 +21,9 @@ use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; +use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DbalCleanupHandler; +use Patchlevel\EventSourcing\Subscription\Cleanup\Dbal\DropTableTask; +use Patchlevel\EventSourcing\Subscription\Cleanup\DefaultCleaner; use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; @@ -42,6 +45,7 @@ use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProjection; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProjectionWithCleanup; use PHPUnit\Framework\Attributes\CoversNothing; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -1177,6 +1181,175 @@ public function testBlueGreenDeploymentRollback(): void ); } + public function testCleanup(): void + { + // Test Setup + + $store = new DoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + ]), + ); + + $schemaDirector->create(); + + $firstEngine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new ProfileProjectionWithCleanup($this->projectionConnection)]), + ); + + // Deploy first version + + $firstEngine->setup(); + $firstEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + cleanupTasks: [new DropTableTask('projection_profile_1')], + ), + ], + $firstEngine->subscriptions(), + ); + + // Run first version + + $profile = Profile::create(ProfileId::generate(), 'John'); + $repository->save($profile); + + $firstEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + cleanupTasks: [new DropTableTask('projection_profile_1')], + ), + ], + $firstEngine->subscriptions(), + ); + + // deploy second version + + $secondEngine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new ProfileNewProjection($this->projectionConnection)]), + cleaner: new DefaultCleaner([ + new DbalCleanupHandler( + $this->projectionConnection, + ), + ]), + ); + + $secondEngine->setup(); + $secondEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + cleanupTasks: [new DropTableTask('projection_profile_1')], + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // switch traffic + + $secondEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Detached, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + cleanupTasks: [new DropTableTask('projection_profile_1')], + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $secondEngine->subscriptions(), + ); + + // shutdown second version (with cleanup) + + $secondEngine->teardown(); + + self::assertEquals( + [ + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $secondEngine->subscriptions(), + ); + + self::assertFalse( + $this->projectionConnection->createSchemaManager()->tableExists('projection_profile_1'), + ); + } + public function testPipeline(): void { $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00'));