Skip to content

Commit 754561d

Browse files
MarkCiliaVincentiDavidWiseman
authored andcommitted
Reduce locking complexity
1 parent 6ad446f commit 754561d

File tree

4 files changed

+25
-41
lines changed

4 files changed

+25
-41
lines changed

DBADash/DBADash.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
</None>
105105
</ItemGroup>
106106
<ItemGroup>
107-
<PackageReference Include="AsyncKeyedLock" Version="7.1.7" />
107+
<PackageReference Include="AsyncKeyedLock" Version="7.1.8" />
108108
<PackageReference Include="AWSSDK.Core" Version="4.0.3.1" />
109109
<PackageReference Include="AWSSDK.S3" Version="4.0.11.3" />
110110
<PackageReference Include="AWSSDK.SQS" Version="4.0.2.3" />

DBADash/Messaging/SQSMessageProcessing.cs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
using Amazon.SQS;
22
using Amazon.SQS.Model;
3+
using AsyncKeyedLock;
4+
using Microsoft.Data.SqlClient;
5+
using Polly;
6+
using Polly.Retry;
37
using Serilog;
48
using System;
59
using System.Collections.Generic;
6-
using System.Threading.Tasks;
7-
using Microsoft.Data.SqlClient;
8-
using Polly.Retry;
9-
using Polly;
10-
using System.Collections.Concurrent;
1110
using System.Data;
1211
using System.Linq;
1312
using System.Text;
1413
using System.Threading;
14+
using System.Threading.Tasks;
1515

1616
namespace DBADash.Messaging
1717
{
@@ -24,17 +24,17 @@ public class SQSMessageProcessing
2424
private const int delayAfterReceivingMessageForDifferentAgent = 1000; // ms
2525
private const int delayBetweenMessages = 100; // ms
2626
private const int errorDelay = 1000; // ms
27-
private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphores = new();
2827
private const int MaxDegreeOfParallelism = 2;
28+
private readonly AsyncKeyedLocker<string> _semaphores = new(o => o.MaxCount = MaxDegreeOfParallelism);
2929

3030
private static readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
3131
.AddRetry(new RetryStrategyOptions
32-
{
33-
ShouldHandle = new PredicateBuilder().Handle<Exception>(),
34-
BackoffType = DelayBackoffType.Constant,
35-
MaxRetryAttempts = 2,
36-
Delay = TimeSpan.FromSeconds(1)
37-
})
32+
{
33+
ShouldHandle = new PredicateBuilder().Handle<Exception>(),
34+
BackoffType = DelayBackoffType.Constant,
35+
MaxRetryAttempts = 2,
36+
Delay = TimeSpan.FromSeconds(1)
37+
})
3838
.Build();
3939

4040
public SQSMessageProcessing(CollectionConfig config)
@@ -227,9 +227,6 @@ private async Task ProcessMessageAsync(Message message, string DBADashAgentIdent
227227
if (msg == null) return;
228228

229229
// Implementations of MessageBase will process the message and return a DataSet or null
230-
var semaphore =
231-
_semaphores.GetOrAdd(destinationConnectionHash,
232-
_ => new SemaphoreSlim(MaxDegreeOfParallelism));
233230
try
234231
{
235232
if (msg is CancellationMessage cancellationMessage) // Process cancellation message immediately without waiting for semaphore
@@ -239,7 +236,8 @@ await SendReplyMessage(DBADashAgentIdentifier, handle, destinationConnectionHash
239236
ResponseMessage.ResponseTypes.Failure, "Message Cancelled").ConfigureAwait(false);
240237
return;
241238
}
242-
if (!await semaphore.WaitAsync(msg.SemaphoreTimeout)) // Semaphore used to limit concurrent processing per connection
239+
using var semaphore = await _semaphores.LockOrNullAsync(destinationConnectionHash, msg.SemaphoreTimeout);
240+
if (semaphore is null) // Semaphore used to limit concurrent processing per connection
243241
{
244242
Log.Warning("Semaphore timeout for message {id} of type {type} with handle {handle}. Service is busy.", msg.Id, msg.GetType().ToString(), handle);
245243
// Semaphore timed out, service is busy
@@ -248,17 +246,9 @@ await SendReplyMessage(DBADashAgentIdentifier, handle, destinationConnectionHash
248246

249247
return;
250248
}
251-
252-
try
253-
{
254-
await DoProcessMessageAsync(msg, DBADashAgentIdentifier, handle, destinationConnectionHash,
255-
replySQS,
256-
replyAgent);
257-
}
258-
finally
259-
{
260-
semaphore.Release();
261-
}
249+
await DoProcessMessageAsync(msg, DBADashAgentIdentifier, handle, destinationConnectionHash,
250+
replySQS,
251+
replyAgent);
262252
}
263253
catch (Exception ex)
264254
{
@@ -384,11 +374,11 @@ private async Task ProcessReply(Message message, Guid handle, string destination
384374
if (responseMessage.Data != null && responseMessage.Data.Tables.Contains("DBADash"))
385375
{
386376
Log.Debug("Writing data to SQL");
387-
377+
388378
await DestinationHandling.WriteDBAsync(responseMessage.Data, destination.ConnectionString,
389379
Config);
390-
391-
if (responseMessage.Exception==null && responseMessage.Data.Tables.Contains("Errors") && responseMessage.Data.Tables["Errors"]!.Rows.Count>0)
380+
381+
if (responseMessage.Exception == null && responseMessage.Data.Tables.Contains("Errors") && responseMessage.Data.Tables["Errors"]!.Rows.Count > 0)
392382
{
393383
var dtErrors = responseMessage.Data.Tables["Errors"];
394384
var sbErrors = new StringBuilder();

DBADashService/DBADashJob.cs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,11 @@ public async Task Execute(IJobExecutionContext context)
8080
Log.Debug("Wait for lock {0}", context.JobDetail.Key);
8181
// Ensures that S3 folder can only be processed by 1 job instance at a time.
8282
// Note: DisallowConcurrentExecution didn't prevent triggered at startup job from overlapping with the scheduled one
83-
var semaphore = ScheduleService.Locker.GetOrAdd(cfg.ConnectionString, _ => new SemaphoreSlim(1, 1));
84-
await semaphore.WaitAsync();
85-
try
83+
using (await ScheduleService.Locker.LockAsync(cfg.ConnectionString))
8684
{
8785
Log.Debug("Lock acquired {0}", context.JobDetail.Key);
8886
await CollectS3(cfg);
8987
}
90-
finally
91-
{
92-
semaphore.Release();
93-
}
94-
9588
break;
9689
}
9790
case ConnectionType.SQL:

DBADashService/SchedulerService.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using DBADash;
1+
using AsyncKeyedLock;
2+
using DBADash;
23
using DBADash.InstanceMetadata;
34
using DBADash.Messaging;
45
using Microsoft.Extensions.Hosting;
@@ -30,7 +31,7 @@ public class ScheduleService : BackgroundService
3031
private System.Timers.Timer folderCleanupTimer;
3132
private readonly CollectionSchedules schedules;
3233
private MessageProcessing messageProcessing;
33-
public static readonly ConcurrentDictionary<string, SemaphoreSlim> Locker = new();
34+
public static readonly AsyncKeyedLocker<string> Locker = new();
3435

3536
private static readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
3637
.AddRetry(new RetryStrategyOptions

0 commit comments

Comments
 (0)