Skip to content

Commit 9f4af44

Browse files
authored
Merge pull request #1 from PejmanNik/improve-api
Enhance orchestration and init entity support
2 parents 7a775d5 + 6c19b41 commit 9f4af44

10 files changed

+278
-19
lines changed

samples/WebAPI/BurgerApi.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public override async Task<string> RunAsync(
5151
};
5252

5353
var results = await Task.WhenAll(tasks);
54-
54+
5555
// 2. Assemble the Burger
5656
var burger = await context.CallAssembleBurgerActivityAsync(results);
5757

src/DurableTask.AspNetCore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<RepositoryUrl>https://github.com/PejmanNik/durabletask-dotnet-aspnet</RepositoryUrl>
1515
<PackageReadmeFile>README.md</PackageReadmeFile>
1616
<RepositoryType>git</RepositoryType>
17-
<Version>0.0.1</Version>
17+
<Version>0.0.2</Version>
1818
</PropertyGroup>
1919

2020
<ItemGroup>
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using Microsoft.DurableTask.Entities;
2+
3+
namespace DurableTask.AspNetCore.Extensions;
4+
5+
internal static class EntityInstanceIdExtensions
6+
{
7+
public static Core.Entities.EntityId ToCore(this EntityInstanceId id)
8+
{
9+
return new Core.Entities.EntityId(id.Name, id.Key);
10+
}
11+
12+
public static EntityInstanceId ToExtended(this Core.Entities.EntityId id)
13+
{
14+
return new EntityInstanceId(id.Name, id.Key);
15+
}
16+
}

src/Extensions/OrchestrationStatusExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace DurableTask.AspNetCore.Extensions;
55

66
internal static class OrchestrationStatusExtensions
77
{
8-
public static OrchestrationStatus ToCoreStatus(this OrchestrationRuntimeStatus status)
8+
public static OrchestrationStatus ToCore(this OrchestrationRuntimeStatus status)
99
{
1010
#pragma warning disable CS0618 // Type or member is obsolete
1111
return status switch
@@ -23,7 +23,7 @@ public static OrchestrationStatus ToCoreStatus(this OrchestrationRuntimeStatus s
2323
#pragma warning restore CS0618 // Type or member is obsolete
2424
}
2525

26-
public static OrchestrationRuntimeStatus ToRuntimeStatus(this OrchestrationStatus status)
26+
public static OrchestrationRuntimeStatus ToExtended(this OrchestrationStatus status)
2727
{
2828
#pragma warning disable CS0618 // Type or member is obsolete
2929
return status switch
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace DurableTask.AspNetCore.Extensions;
2+
3+
internal static class PurgeResultExtensions
4+
{
5+
public static Microsoft.DurableTask.Client.PurgeResult ToExpanded(this DurableTask.Core.PurgeResult result)
6+
{
7+
return new Microsoft.DurableTask.Client.PurgeResult(
8+
result.DeletedInstanceCount);
9+
}
10+
}

src/Extensions/ServiceCollectionExtensions.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,37 @@ public static IServiceCollection AddSelfHostedDurableTaskHub<TIOrchestrationServ
1515
TIOrchestrationService orchestrationService
1616
) where TIOrchestrationService : IOrchestrationService, IOrchestrationServiceClient
1717
{
18-
return services.AddSelfHostedDurableTaskHub(orchestrationService, orchestrationService);
18+
return services.AddSelfHostedDurableTaskHub((_) => orchestrationService, (_) => orchestrationService);
1919
}
2020

21-
public static IServiceCollection AddSelfHostedDurableTaskHub(
21+
public static IServiceCollection AddSelfHostedDurableTaskHub<TIOrchestrationService>(
22+
this IServiceCollection services,
23+
Func<IServiceProvider, TIOrchestrationService> orchestrationServiceFactory)
24+
where TIOrchestrationService : IOrchestrationService, IOrchestrationServiceClient
25+
{
26+
return services.AddSelfHostedDurableTaskHub(orchestrationServiceFactory, orchestrationServiceFactory);
27+
}
28+
29+
public static IServiceCollection AddSelfHostedDurableTaskHub<TIOrchestrationService, TIOrchestrationServiceClient>(
2230
this IServiceCollection services,
23-
IOrchestrationService orchestrationService,
24-
IOrchestrationServiceClient orchestrationServiceClient)
31+
Func<IServiceProvider, TIOrchestrationService> orchestrationServiceFactory,
32+
Func<IServiceProvider, TIOrchestrationServiceClient> orchestrationServiceClientFactory)
33+
where TIOrchestrationService : IOrchestrationService
34+
where TIOrchestrationServiceClient : IOrchestrationServiceClient
2535
{
2636
ArgumentNullException.ThrowIfNull(services, nameof(services));
27-
ArgumentNullException.ThrowIfNull(orchestrationService, nameof(orchestrationService));
28-
ArgumentNullException.ThrowIfNull(orchestrationServiceClient, nameof(orchestrationServiceClient));
37+
ArgumentNullException.ThrowIfNull(orchestrationServiceFactory, nameof(orchestrationServiceFactory));
38+
ArgumentNullException.ThrowIfNull(orchestrationServiceClientFactory, nameof(orchestrationServiceClientFactory));
2939

3040
return services
3141
.AddSingleton<IWorkerDispatcherMiddleware, WorkerDispatcherMiddleware>()
3242
.AddSingleton((sp) => new TaskHubClient(
33-
orchestrationServiceClient,
43+
orchestrationServiceClientFactory(sp),
3444
dataConverter: BuildDataConverter(sp),
3545
loggerFactory: sp.GetService<ILoggerFactory>())
3646
)
3747
.AddSingleton((sp) => new TaskHubWorker(
38-
orchestrationService,
48+
orchestrationServiceFactory(sp),
3949
orchestrationObjectManager: new ShimOrchestrationObjectManager(),
4050
activityObjectManager: new ShimActivityObjectManager(),
4151
loggerFactory: sp.GetService<ILoggerFactory>())
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
using DurableTask.AspNetCore.Extensions;
2+
using DurableTask.Core;
3+
using DurableTask.Core.Entities;
4+
using Microsoft.DurableTask;
5+
using Microsoft.DurableTask.Client;
6+
using Microsoft.DurableTask.Client.Entities;
7+
using Microsoft.DurableTask.Entities;
8+
using System.Diagnostics.CodeAnalysis;
9+
10+
namespace DurableTask.AspNetCore;
11+
12+
internal sealed class SelfHostedDurableEntityClient : DurableEntityClient
13+
{
14+
private readonly IOrchestrationServiceClient serviceClient;
15+
private readonly EntityBackendQueries entityBackendQueries;
16+
private readonly DataConverter dataConverter;
17+
18+
public SelfHostedDurableEntityClient(
19+
IOrchestrationServiceClient serviceClient,
20+
EntityBackendQueries entityBackendQueries,
21+
DataConverter dataConverter,
22+
string name) : base(name)
23+
{
24+
this.serviceClient = serviceClient;
25+
this.entityBackendQueries = entityBackendQueries;
26+
this.dataConverter = dataConverter;
27+
}
28+
29+
public override async Task<CleanEntityStorageResult> CleanEntityStorageAsync(CleanEntityStorageRequest? request = null, bool continueUntilComplete = true, CancellationToken cancellation = default)
30+
{
31+
var result = await entityBackendQueries
32+
.CleanEntityStorageAsync(new EntityBackendQueries.CleanEntityStorageRequest()
33+
{
34+
RemoveEmptyEntities = request?.RemoveEmptyEntities ?? true,
35+
ReleaseOrphanedLocks = request?.ReleaseOrphanedLocks ?? true,
36+
ContinuationToken = request?.ContinuationToken,
37+
}, cancellation);
38+
39+
return new CleanEntityStorageResult()
40+
{
41+
ContinuationToken = result.ContinuationToken,
42+
EmptyEntitiesRemoved = result.EmptyEntitiesRemoved,
43+
OrphanedLocksReleased = result.OrphanedLocksReleased,
44+
};
45+
}
46+
47+
public override AsyncPageable<EntityMetadata> GetAllEntitiesAsync(EntityQuery? filter = null)
48+
{
49+
return GetAllEntitiesAsync(m => Convert(m), filter);
50+
}
51+
52+
public override AsyncPageable<EntityMetadata<T>> GetAllEntitiesAsync<T>(EntityQuery? filter = null)
53+
{
54+
return GetAllEntitiesAsync(m => Convert<T>(m), filter);
55+
}
56+
57+
public override async Task<EntityMetadata?> GetEntityAsync(
58+
EntityInstanceId id,
59+
bool includeState = true,
60+
CancellationToken cancellation = default)
61+
{
62+
var entity = await entityBackendQueries.GetEntityAsync(
63+
id.ToCore(), includeState, false, cancellation);
64+
65+
return Convert(entity);
66+
}
67+
68+
public override async Task<EntityMetadata<T>?> GetEntityAsync<T>(
69+
EntityInstanceId id,
70+
bool includeState = true,
71+
CancellationToken cancellation = default)
72+
{
73+
var entity = await entityBackendQueries.GetEntityAsync(
74+
id.ToCore(), includeState, false, cancellation);
75+
76+
return Convert<T>(entity);
77+
}
78+
79+
public override Task SignalEntityAsync(
80+
EntityInstanceId id,
81+
string operationName,
82+
object? input = null,
83+
SignalEntityOptions? options = null,
84+
CancellationToken cancellation = default)
85+
{
86+
var scheduledTime = options?.SignalTime;
87+
var serializedInput = dataConverter.Serialize(input);
88+
89+
var eventToSend = ClientEntityHelpers.EmitOperationSignal(new OrchestrationInstance() { InstanceId = id.ToString() },
90+
Guid.NewGuid(),
91+
operationName,
92+
serializedInput,
93+
EntityMessageEvent.GetCappedScheduledTime(
94+
DateTime.UtcNow,
95+
TimeSpan.FromDays(3),
96+
scheduledTime?.UtcDateTime));
97+
98+
return serviceClient.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage());
99+
}
100+
101+
private AsyncPageable<TMetadata> GetAllEntitiesAsync<TMetadata>(
102+
Func<EntityBackendQueries.EntityMetadata, TMetadata> select,
103+
EntityQuery? filter)
104+
where TMetadata : notnull
105+
{
106+
return Pageable.Create(async (continuation, size, cancellation) =>
107+
{
108+
continuation ??= filter?.ContinuationToken;
109+
size ??= filter?.PageSize;
110+
EntityBackendQueries.EntityQueryResult result = await entityBackendQueries.QueryEntitiesAsync(
111+
new EntityBackendQueries.EntityQuery()
112+
{
113+
InstanceIdStartsWith = filter?.InstanceIdStartsWith ?? string.Empty,
114+
LastModifiedFrom = filter?.LastModifiedFrom?.UtcDateTime,
115+
LastModifiedTo = filter?.LastModifiedTo?.UtcDateTime,
116+
IncludeTransient = filter?.IncludeTransient ?? false,
117+
IncludeState = filter?.IncludeState ?? true,
118+
ContinuationToken = continuation,
119+
PageSize = size,
120+
},
121+
cancellation);
122+
123+
return new Page<TMetadata>([.. result.Results.Select(select)], result.ContinuationToken);
124+
});
125+
}
126+
127+
[return: NotNullIfNotNull(nameof(metadata))]
128+
private EntityMetadata<T>? Convert<T>(EntityBackendQueries.EntityMetadata? metadata)
129+
{
130+
if (metadata is null)
131+
{
132+
return null;
133+
}
134+
135+
return new EntityMetadata<T>(
136+
metadata.Value.EntityId.ToExtended(),
137+
dataConverter.Deserialize<T>(metadata.Value.SerializedState))
138+
{
139+
LastModifiedTime = metadata.Value.LastModifiedTime,
140+
BacklogQueueSize = metadata.Value.BacklogQueueSize,
141+
LockedBy = metadata.Value.LockedBy,
142+
};
143+
}
144+
145+
[return: NotNullIfNotNull(nameof(metadata))]
146+
private EntityMetadata? Convert(EntityBackendQueries.EntityMetadata? metadata)
147+
{
148+
if (metadata is null)
149+
{
150+
return null;
151+
}
152+
153+
var data = metadata.Value.SerializedState is null
154+
? null
155+
: new SerializedData(metadata.Value.SerializedState, dataConverter);
156+
157+
return new EntityMetadata(
158+
metadata.Value.EntityId.ToExtended(),
159+
data)
160+
{
161+
LastModifiedTime = metadata.Value.LastModifiedTime,
162+
BacklogQueueSize = metadata.Value.BacklogQueueSize,
163+
LockedBy = metadata.Value.LockedBy,
164+
};
165+
}
166+
167+
}

src/SelfHostedDurableTaskClient.cs

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using DurableTask.AspNetCore.Extensions;
22
using DurableTask.Core;
3+
using DurableTask.Core.Entities;
34
using Microsoft.DurableTask;
45
using Microsoft.DurableTask.Client;
6+
using Microsoft.DurableTask.Client.Entities;
57
using Microsoft.Extensions.Options;
68

79

@@ -12,25 +14,47 @@ internal sealed class SelfHostedDurableTaskClient : DurableTaskClient
1214
{
1315
private readonly TaskHubClient client;
1416
private readonly DurableTaskClientOptions options;
17+
private SelfHostedDurableEntityClient? entities;
1518

1619
public SelfHostedDurableTaskClient(string name, TaskHubClient client, IOptions<DurableTaskClientOptions> options) : base(name)
1720
{
1821
this.client = client;
1922
this.options = options.Value;
2023
}
2124

22-
public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(OrchestrationQuery? filter = null)
25+
public override DurableEntityClient Entities
2326
{
24-
if (client.ServiceClient is not Core.Query.IOrchestrationServiceQueryClient queryClient)
27+
get
2528
{
26-
throw new NotSupportedException($"Provided IOrchestrationServiceClient does not implement {typeof(Core.Query.IOrchestrationServiceQueryClient)}.");
29+
if (entities is not null)
30+
{
31+
return entities;
32+
}
33+
34+
if (!options.EnableEntitySupport)
35+
{
36+
throw new InvalidOperationException("Entity support is not enabled.");
37+
}
38+
39+
var entityService = As<IEntityOrchestrationService>();
40+
if (entityService.EntityBackendQueries is null)
41+
{
42+
throw new InvalidOperationException("The configured IOrchestrationServiceClient does not support entities.");
43+
}
44+
45+
entities = new SelfHostedDurableEntityClient(client.ServiceClient, entityService.EntityBackendQueries, options.DataConverter, Name);
46+
return entities;
2747
}
48+
}
2849

50+
public override AsyncPageable<OrchestrationMetadata> GetAllInstancesAsync(OrchestrationQuery? filter = null)
51+
{
52+
var queryClient = As<Core.Query.IOrchestrationServiceQueryClient>();
2953
return Pageable.Create(async (continuation, pageSize, cancellation) =>
3054
{
3155
var coreQuery = new Core.Query.OrchestrationQuery()
3256
{
33-
RuntimeStatus = filter?.Statuses?.Select(x => x.ToCoreStatus()).ToArray(),
57+
RuntimeStatus = filter?.Statuses?.Select(x => x.ToCore()).ToArray(),
3458
CreatedTimeFrom = filter?.CreatedFrom?.UtcDateTime,
3559
CreatedTimeTo = filter?.CreatedTo?.UtcDateTime,
3660
TaskHubNames = filter?.TaskHubNames?.ToList(),
@@ -128,11 +152,41 @@ public override Task TerminateInstanceAsync(string instanceId, TerminateInstance
128152
return client.TerminateInstanceAsync(CreateOrchestrationInstance(instanceId), reason!);
129153
}
130154

155+
public override async Task<Microsoft.DurableTask.Client.PurgeResult> PurgeInstanceAsync(string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
156+
{
157+
cancellation.ThrowIfCancellationRequested();
158+
var result = await As<IOrchestrationServicePurgeClient>().PurgeInstanceStateAsync(instanceId);
159+
160+
return result.ToExpanded();
161+
}
162+
163+
public override async Task<Microsoft.DurableTask.Client.PurgeResult> PurgeAllInstancesAsync(PurgeInstancesFilter filter, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
164+
{
165+
cancellation.ThrowIfCancellationRequested();
166+
var result = await As<IOrchestrationServicePurgeClient>().PurgeInstanceStateAsync(new DurableTask.Core.PurgeInstanceFilter(
167+
filter.CreatedFrom?.UtcDateTime ?? DateTime.MinValue,
168+
filter.CreatedTo?.UtcDateTime,
169+
filter.Statuses?.Select(x => x.ToCore()).ToArray()
170+
));
171+
172+
return result.ToExpanded();
173+
}
174+
131175
public override ValueTask DisposeAsync()
132176
{
133177
return ValueTask.CompletedTask;
134178
}
135179

180+
private T As<T>()
181+
{
182+
if (client.ServiceClient is not T t)
183+
{
184+
throw new NotSupportedException($"Provided IOrchestrationServiceClient does not implement {typeof(T)}.");
185+
}
186+
187+
return t;
188+
}
189+
136190
private static OrchestrationInstance CreateOrchestrationInstance(string instanceId)
137191
=> new() { InstanceId = instanceId };
138192

@@ -142,7 +196,7 @@ private OrchestrationMetadata CreateMetadata(OrchestrationState state, bool incl
142196
{
143197
CreatedAt = state.CreatedTime,
144198
LastUpdatedAt = state.LastUpdatedTime,
145-
RuntimeStatus = state.OrchestrationStatus.ToRuntimeStatus(),
199+
RuntimeStatus = state.OrchestrationStatus.ToExtended(),
146200
SerializedInput = state.Input,
147201
SerializedOutput = state.Output,
148202
SerializedCustomStatus = state.Status,

0 commit comments

Comments
 (0)