diff --git a/src/WorkflowCore/ServiceCollectionExtensions.cs b/src/WorkflowCore/ServiceCollectionExtensions.cs index f38f44a5b..7f1d5185b 100644 --- a/src/WorkflowCore/ServiceCollectionExtensions.cs +++ b/src/WorkflowCore/ServiceCollectionExtensions.cs @@ -20,6 +20,10 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A var options = new WorkflowOptions(services); setupAction?.Invoke(options); services.AddSingleton(); + if (!services.Any(x => x.ServiceType == typeof(IWorkflowPurger))) + { + services.AddSingleton(sp => (IWorkflowPurger)sp.GetService()); + } services.AddTransient(options.PersistenceFactory); services.AddTransient(options.PersistenceFactory); services.AddTransient(options.PersistenceFactory); @@ -120,4 +124,3 @@ public static IServiceCollection AddWorkflowMiddleware( : services.AddTransient(factory); } } - diff --git a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs index cb69c8791..2f7139446 100644 --- a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs @@ -17,7 +17,7 @@ public interface ISingletonMemoryProvider : IPersistenceProvider /// /// In-memory implementation of IPersistenceProvider for demo and testing purposes /// - public class MemoryPersistenceProvider : ISingletonMemoryProvider + public class MemoryPersistenceProvider : ISingletonMemoryProvider, IWorkflowPurger { private readonly List _instances = new List(); private readonly List _subscriptions = new List(); @@ -286,6 +286,30 @@ public Task ProcessCommands(DateTimeOffset asOf, Func ac { throw new NotImplementedException(); } + + /// + /// Purges only completed or terminated workflows; other status values are rejected. + /// + /// + /// Workflows without a value are not eligible for purge. + /// + public Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default) + { + var olderThanUtc = olderThan.ToUniversalTime(); + if (status != WorkflowStatus.Complete && status != WorkflowStatus.Terminated) + { + throw new ArgumentOutOfRangeException(nameof(status), status, "Only complete or terminated workflows can be purged."); + } + + lock (_instances) + { + _instances.RemoveAll(x => x.Status == status + && x.CompleteTime.HasValue + && x.CompleteTime.Value < olderThanUtc); + } + + return Task.CompletedTask; + } } #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously diff --git a/test/WorkflowCore.UnitTests/Services/MemoryPersistenceProviderFixture.cs b/test/WorkflowCore.UnitTests/Services/MemoryPersistenceProviderFixture.cs index 1bfa2b3f7..e329c93df 100644 --- a/test/WorkflowCore.UnitTests/Services/MemoryPersistenceProviderFixture.cs +++ b/test/WorkflowCore.UnitTests/Services/MemoryPersistenceProviderFixture.cs @@ -1,6 +1,10 @@ using System; +using System.Threading.Tasks; +using FluentAssertions; using WorkflowCore.Interface; +using WorkflowCore.Models; using WorkflowCore.Services; +using Xunit; namespace WorkflowCore.UnitTests.Services { @@ -9,5 +13,63 @@ public class MemoryPersistenceProviderFixture : BasePersistenceFixture private readonly IPersistenceProvider _subject = new MemoryPersistenceProvider(); protected override IPersistenceProvider Subject => _subject; + + private IWorkflowPurger Purger => (IWorkflowPurger)_subject; + + [Fact] + public async Task PurgeWorkflows_should_remove_terminated_instances() + { + var terminatedWorkflow = new WorkflowInstance + { + Status = WorkflowStatus.Terminated, + CreateTime = DateTime.UtcNow.AddDays(-2), + CompleteTime = DateTime.UtcNow.AddDays(-2), + WorkflowDefinitionId = "terminated" + }; + + var activeWorkflow = new WorkflowInstance + { + Status = WorkflowStatus.Runnable, + CreateTime = DateTime.UtcNow.AddDays(-2), + NextExecution = 0, + WorkflowDefinitionId = "active" + }; + + var terminatedId = await _subject.CreateNewWorkflow(terminatedWorkflow); + var activeId = await _subject.CreateNewWorkflow(activeWorkflow); + + await Purger.PurgeWorkflows(WorkflowStatus.Terminated, DateTime.UtcNow.AddDays(-1)); + + var remaining = await _subject.GetWorkflowInstances(new[] { terminatedId, activeId }); + remaining.Should().ContainSingle(x => x.Id == activeId); + } + + [Fact] + public async Task PurgeWorkflows_should_remove_completed_instances() + { + var completedWorkflow = new WorkflowInstance + { + Status = WorkflowStatus.Complete, + CreateTime = DateTime.UtcNow.AddDays(-3), + CompleteTime = DateTime.UtcNow.AddDays(-3), + WorkflowDefinitionId = "completed" + }; + + var activeWorkflow = new WorkflowInstance + { + Status = WorkflowStatus.Runnable, + CreateTime = DateTime.UtcNow.AddDays(-1), + NextExecution = 0, + WorkflowDefinitionId = "active" + }; + + var completedId = await _subject.CreateNewWorkflow(completedWorkflow); + var activeId = await _subject.CreateNewWorkflow(activeWorkflow); + + await Purger.PurgeWorkflows(WorkflowStatus.Complete, DateTime.UtcNow.AddDays(-2)); + + var remaining = await _subject.GetWorkflowInstances(new[] { completedId, activeId }); + remaining.Should().ContainSingle(x => x.Id == activeId); + } } }