Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
var options = new WorkflowOptions(services);
setupAction?.Invoke(options);
services.AddSingleton<ISingletonMemoryProvider, MemoryPersistenceProvider>();
if (!services.Any(x => x.ServiceType == typeof(IWorkflowPurger)))
{
services.AddSingleton<IWorkflowPurger>(sp => (IWorkflowPurger)sp.GetService<ISingletonMemoryProvider>());
}
services.AddTransient<IPersistenceProvider>(options.PersistenceFactory);
services.AddTransient<IWorkflowRepository>(options.PersistenceFactory);
services.AddTransient<ISubscriptionRepository>(options.PersistenceFactory);
Expand Down Expand Up @@ -120,4 +124,3 @@ public static IServiceCollection AddWorkflowMiddleware<TMiddleware>(
: services.AddTransient<IWorkflowMiddleware, TMiddleware>(factory);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public interface ISingletonMemoryProvider : IPersistenceProvider
/// <summary>
/// In-memory implementation of IPersistenceProvider for demo and testing purposes
/// </summary>
public class MemoryPersistenceProvider : ISingletonMemoryProvider
public class MemoryPersistenceProvider : ISingletonMemoryProvider, IWorkflowPurger
{
private readonly List<WorkflowInstance> _instances = new List<WorkflowInstance>();
private readonly List<EventSubscription> _subscriptions = new List<EventSubscription>();
Expand Down Expand Up @@ -286,6 +286,30 @@ public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> ac
{
throw new NotImplementedException();
}

/// <summary>
/// Purges only completed or terminated workflows; other status values are rejected.
/// </summary>
/// <remarks>
/// Workflows without a <see cref="WorkflowInstance.CompleteTime"/> value are not eligible for purge.
/// </remarks>
public Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default)
{
var olderThanUtc = olderThan.ToUniversalTime();
if (status != WorkflowStatus.Complete && status != WorkflowStatus.Terminated)
{
throw new ArgumentOutOfRangeException(nameof(status), status, "Only complete or terminated workflows can be purged.");
}

lock (_instances)
{
_instances.RemoveAll(x => x.Status == status
&& x.CompleteTime.HasValue
&& x.CompleteTime.Value < olderThanUtc);
}

return Task.CompletedTask;
}
}

#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using System;
using System.Threading.Tasks;
using FluentAssertions;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using WorkflowCore.Services;
using Xunit;

namespace WorkflowCore.UnitTests.Services
{
Expand All @@ -9,5 +13,63 @@ public class MemoryPersistenceProviderFixture : BasePersistenceFixture
private readonly IPersistenceProvider _subject = new MemoryPersistenceProvider();

protected override IPersistenceProvider Subject => _subject;

private IWorkflowPurger Purger => (IWorkflowPurger)_subject;

[Fact]
public async Task PurgeWorkflows_should_remove_terminated_instances()
{
var terminatedWorkflow = new WorkflowInstance
{
Status = WorkflowStatus.Terminated,
CreateTime = DateTime.UtcNow.AddDays(-2),
CompleteTime = DateTime.UtcNow.AddDays(-2),
WorkflowDefinitionId = "terminated"
};

var activeWorkflow = new WorkflowInstance
{
Status = WorkflowStatus.Runnable,
CreateTime = DateTime.UtcNow.AddDays(-2),
NextExecution = 0,
WorkflowDefinitionId = "active"
};

var terminatedId = await _subject.CreateNewWorkflow(terminatedWorkflow);
var activeId = await _subject.CreateNewWorkflow(activeWorkflow);

await Purger.PurgeWorkflows(WorkflowStatus.Terminated, DateTime.UtcNow.AddDays(-1));

var remaining = await _subject.GetWorkflowInstances(new[] { terminatedId, activeId });
remaining.Should().ContainSingle(x => x.Id == activeId);
}

[Fact]
public async Task PurgeWorkflows_should_remove_completed_instances()
{
var completedWorkflow = new WorkflowInstance
{
Status = WorkflowStatus.Complete,
CreateTime = DateTime.UtcNow.AddDays(-3),
CompleteTime = DateTime.UtcNow.AddDays(-3),
WorkflowDefinitionId = "completed"
};

var activeWorkflow = new WorkflowInstance
{
Status = WorkflowStatus.Runnable,
CreateTime = DateTime.UtcNow.AddDays(-1),
NextExecution = 0,
WorkflowDefinitionId = "active"
};

var completedId = await _subject.CreateNewWorkflow(completedWorkflow);
var activeId = await _subject.CreateNewWorkflow(activeWorkflow);

await Purger.PurgeWorkflows(WorkflowStatus.Complete, DateTime.UtcNow.AddDays(-2));

var remaining = await _subject.GetWorkflowInstances(new[] { completedId, activeId });
remaining.Should().ContainSingle(x => x.Id == activeId);
}
}
}