Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions samples/MerQure.Samples/DeadLetterExample.cs
Original file line number Diff line number Diff line change
@@ -1,62 +1,62 @@
using MerQure.RbMQ.Content;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MerQure.Samples
namespace MerQure.Samples;

public class DeadLetterExample
{
public class DeadLetterExample
{
private readonly IMessagingService _messagingService;
private readonly IMessagingService _messagingService;

public DeadLetterExample(IMessagingService messagingService)
{
_messagingService = messagingService;
}
public DeadLetterExample(IMessagingService messagingService)
{
_messagingService = messagingService;
}

public void Run()
{
// RabbitMQ init
_messagingService.DeclareExchange("deadletter.exchange", Constants.ExchangeTypeFanout);
_messagingService.DeclareQueue("deadletter.queue", isQuorum: true);
_messagingService.DeclareBinding("deadletter.exchange", "deadletter.queue", "#");
public async Task RunAsync()
{
// RabbitMQ init
await _messagingService.DeclareExchangeAsync("deadletter.exchange", Constants.ExchangeTypeFanout);
await _messagingService.DeclareQueueAsync("deadletter.queue", isQuorum: true);
await _messagingService.DeclareBindingAsync("deadletter.exchange", "deadletter.queue", "#");

_messagingService.DeclareExchange("delay.exchange", Constants.ExchangeTypeHeaders);
_messagingService.DeclareQueueWithDeadLetterPolicy("deadletter.queue.5", "deadletter.exchange", 5000, null, isQuorum: true);
_messagingService.DeclareQueueWithDeadLetterPolicy("deadletter.queue.30", "deadletter.exchange", 30000, null, isQuorum: true);
await _messagingService.DeclareExchangeAsync("delay.exchange", Constants.ExchangeTypeHeaders);
await _messagingService.DeclareQueueWithDeadLetterPolicyAsync("deadletter.queue.5", "deadletter.exchange", 5000, null, isQuorum: true);
await _messagingService.DeclareQueueWithDeadLetterPolicyAsync("deadletter.queue.30", "deadletter.exchange", 30000, null, isQuorum: true);

_messagingService.DeclareBinding("delay.exchange", "deadletter.queue.5", "#", new Dictionary<string, object> { { "delay", 5 } });
_messagingService.DeclareBinding("delay.exchange", "deadletter.queue.30", "#", new Dictionary<string, object> { { "delay", 30 } });
await _messagingService.DeclareBindingAsync("delay.exchange", "deadletter.queue.5", "#", new Dictionary<string, object> { { "delay", 5 } });
await _messagingService.DeclareBindingAsync("delay.exchange", "deadletter.queue.30", "#", new Dictionary<string, object> { { "delay", 30 } });

var dateStart = DateTime.Now;
var dateStart = DateTime.Now;

// Get the publisher and publish messages
using (var publisher = _messagingService.GetPublisher("delay.exchange"))
// Get the publisher and publish messages
await using (var publisher = await _messagingService.GetPublisherAsync("delay.exchange"))
{
// publish messages waiting 5s
for (int i = 0; i <= 10; i++)
{
// publish messages waiting 5s
for (int i = 0; i <= 10; i++)
{
var message = new Message($"delay.5.message.{i}", $"Waited 5s: {i} !");
message.Header.GetProperties().Add("delay", 5);
publisher.Publish(message);
}
// publish messages waiting 30s
for (int i = 0; i <= 10; i++)
{
var message = new Message($"delay.30.message.{i}", $"Waited 30s: {i} !");
message.Header.GetProperties().Add("delay", 30);
publisher.Publish(message);
}
var message = new Message($"delay.5.message.{i}", $"Waited 5s: {i} !");
message.Header.GetProperties().Add("delay", 5);
await publisher.PublishAsync(message);
}

// Get the consumer on the existing queue and consume its messages
var consumer = _messagingService.GetConsumer("deadletter.queue");
consumer.Consume((object sender, IMessagingEvent args) =>
// publish messages waiting 30s
for (int i = 0; i <= 10; i++)
{
var realDelay = DateTime.Now.Subtract(dateStart).TotalSeconds;
Console.WriteLine(string.Format("{0} received after {1:#.##}s.", args.Message.GetRoutingKey(), realDelay));
// send ACK: acknowlegdment to the queue
consumer.AcknowlegdeDeliveredMessage(args);
});
var message = new Message($"delay.30.message.{i}", $"Waited 30s: {i} !");
message.Header.GetProperties().Add("delay", 30);
await publisher.PublishAsync(message);
}
}

// Get the consumer on the existing queue and consume its messages
var consumer = await _messagingService.GetConsumerAsync("deadletter.queue");
await consumer.ConsumeAsync((object sender, IMessagingEvent args) =>
{
var realDelay = DateTime.Now.Subtract(dateStart).TotalSeconds;
Console.WriteLine(string.Format("{0} received after {1:#.##}s.", args.Message.GetRoutingKey(), realDelay));
// send ACK: acknowlegdment to the queue
consumer.AcknowlegdeDeliveredMessageAsync(args);
});
}
}
}
2 changes: 1 addition & 1 deletion samples/MerQure.Samples/MerQure.Samples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="6.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.4" />
<PackageReference Include="RabbitMQ.Client" Version="7.1.2" />
</ItemGroup>

<ItemGroup>
Expand Down
16 changes: 9 additions & 7 deletions samples/MerQure.Samples/Program.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MerQure.Samples
{
static class Program
{
static void Main()
static async Task Main()
{
IServiceCollection services = new ServiceCollection();

Expand All @@ -16,17 +18,17 @@ static void Main()

Console.WriteLine("Running sample 1: simple");
var simpleExample = serviceProvider.GetService<SimpleExample>();
simpleExample.Run();
await simpleExample.RunAsync();

System.Threading.Thread.Sleep(500);
Thread.Sleep(500);
Console.WriteLine("Running sample 2: stop");
var stopExample = serviceProvider.GetService<StopExample>();
stopExample.Run();
await stopExample.RunAsync();

System.Threading.Thread.Sleep(500);
Thread.Sleep(500);
Console.WriteLine("Running sample 3: deadletter");
var deadLetterExample = serviceProvider.GetService<DeadLetterExample>();
deadLetterExample.Run();
await deadLetterExample.RunAsync();
}
}
}
}
78 changes: 38 additions & 40 deletions samples/MerQure.Samples/SimpleExample.cs
Original file line number Diff line number Diff line change
@@ -1,54 +1,52 @@
using MerQure.RbMQ.Content;
using System;
using System.Threading.Tasks;

namespace MerQure.Samples
namespace MerQure.Samples;

public class SimpleExample
{
public class SimpleExample
private readonly IMessagingService _messagingService;

public SimpleExample(IMessagingService messagingService)
{
_messagingService = messagingService;
}

public async Task RunAsync()
{
private readonly IMessagingService _messagingService;
// RabbitMQ init
await _messagingService.DeclareExchangeAsync("simple.exchange");
await _messagingService.DeclareQueueAsync("simple.queue", isQuorum: true);
await _messagingService.DeclareBindingAsync("simple.exchange", "simple.queue", "simple.message.*");

public SimpleExample(IMessagingService messagingService)
// Get the publisher and declare Exhange where publish messages
await using var publisher = await _messagingService.GetPublisherAsync("simple.exchange");
// publish messages
for (int i = 0; i <= 10; i++)
{
_messagingService = messagingService;
await publisher.PublishAsync(new Message($"simple.message.test{i}", $"Hello world {i} !"));
}

public void Run()
{
// RabbitMQ init
_messagingService.DeclareExchange("simple.exchange");
_messagingService.DeclareQueue("simple.queue", isQuorum: true);
_messagingService.DeclareBinding("simple.exchange", "simple.queue", "simple.message.*");

// Get the publisher and declare Exhange where publish messages
using (var publisher = _messagingService.GetPublisher("simple.exchange"))
// Get the consumer on the existing queue and consume its messages
var consumer = await _messagingService.GetConsumerAsync("simple.queue");
var random = new Random();
await consumer.ConsumeAsync((object sender, IMessagingEvent args) =>
{
// we simulate the delivery success
if (random.Next() % 2 == 0)
{
// publish messages
for (int i = 0; i <= 10; i++)
{
publisher.Publish(new Message($"simple.message.test{i}", $"Hello world {i} !"));
}
Console.WriteLine("Retry " + args.Message.GetRoutingKey());
// send NACK: negative acknowlegdment to the queue
consumer.RejectDeliveredMessageAsync(args);
}


// Get the consumer on the existing queue and consume its messages
var consumer = _messagingService.GetConsumer("simple.queue");
var random = new Random();
consumer.Consume((object sender, IMessagingEvent args) =>
else
{
// we simulate the delivery success
if (random.Next() % 2 == 0)
{
Console.WriteLine("Retry " + args.Message.GetRoutingKey());
// send NACK: negative acknowlegdment to the queue
consumer.RejectDeliveredMessage(args);
}
else
{
Console.WriteLine(args.Message.GetBody());
// send ACK: acknowlegdment to the queue
consumer.AcknowlegdeDeliveredMessage(args);
}
});
}
Console.WriteLine(args.Message.GetBody());
// send ACK: acknowlegdment to the queue
consumer.AcknowlegdeDeliveredMessageAsync(args);
}
});
}
}
}
86 changes: 43 additions & 43 deletions samples/MerQure.Samples/StopExample.cs
Original file line number Diff line number Diff line change
@@ -1,54 +1,54 @@
using MerQure.RbMQ.Content;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MerQure.Samples
namespace MerQure.Samples;

public class StopExample
{
public class StopExample
private readonly IMessagingService _messagingService;

public StopExample(IMessagingService messagingService)
{
_messagingService = messagingService;
}

public async Task RunAsync()
{
private readonly IMessagingService _messagingService;
// RabbitMQ init
await _messagingService.DeclareExchangeAsync("stop.exchange");
await _messagingService.DeclareQueueAsync("stop.queue", isQuorum: true);
await _messagingService.DeclareBindingAsync("stop.exchange", "stop.queue", "stop.message.* ");

public StopExample(IMessagingService messagingService)
// Get the publisher and declare Exhange where publish messages
await using var publisher = await _messagingService.GetPublisherAsync("stop.exchange");
// publish messages
for (int i = 0; i <= 20; i++)
{
_messagingService = messagingService;
await publisher.PublishAsync(new Message("stop.message.test" + i, $"Hello world {i} !"));
}

public void Run()
// Get the consumer on the existing queue and consume its messages
var consumer = await _messagingService.GetConsumerAsync("stop.queue");
await consumer.ConsumeAsync((_, args) =>
{
// RabbitMQ init
_messagingService.DeclareExchange("stop.exchange");
_messagingService.DeclareQueue("stop.queue", isQuorum: true);
_messagingService.DeclareBinding("stop.exchange", "stop.queue", "stop.message.* ");

// Get the publisher and declare Exhange where publish messages
using (var publisher = _messagingService.GetPublisher("stop.exchange"))
{
// publish messages
for (int i = 0; i <= 20; i++)
{
publisher.Publish(new Message("stop.message.test" + i, $"Hello world {i} !"));
}
}

// Get the consumer on the existing queue and consume its messages
var consumer = _messagingService.GetConsumer("stop.queue");
consumer.Consume((object sender, IMessagingEvent args) =>
{
Console.WriteLine("Consumer Working: " + consumer.IsConsuming());

// Process one message max every 10 ms
System.Threading.Thread.Sleep(10);
Console.WriteLine(args.Message.GetBody());
// send ACK: acknowlegdment to the queue
consumer.AcknowlegdeDeliveredMessage(args);
});

// Stop Consuming after 100 ms ~ 10 messages
System.Threading.Thread.Sleep(100);
Console.WriteLine("Consumer Stopping ...");
consumer.StopConsuming((object sender, EventArgs args) =>
{
Console.WriteLine("Consumer Stopped: " + consumer.IsConsuming().ToString());
});
}
Console.WriteLine("Consumer Working: " + consumer.IsConsuming());

// Process one message max every 10 ms
Thread.Sleep(10);
Console.WriteLine(args.Message.GetBody());
// send ACK: acknowlegdment to the queue
consumer.AcknowlegdeDeliveredMessageAsync(args);
});

// Stop Consuming after 100 ms ~ 10 messages
Thread.Sleep(100);
Console.WriteLine("Consumer Stopping ...");
await consumer.StopConsuming((_, _) =>
{
Console.WriteLine("Consumer Stopped: " + consumer.IsConsuming());
return Task.CompletedTask;
});
}
}
}
34 changes: 17 additions & 17 deletions samples/MerQure.Tools.Samples/Program.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
using MerQure.Tools.Samples.RetryBusExample.Domain;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading.Tasks;

namespace MerQure.Tools.Samples
namespace MerQure.Tools.Samples;

public static class Program
{
public static class Program
static async Task Main(string[] args)
{
static void Main(string[] args)
{
IServiceCollection services = new ServiceCollection();

var startup = new Startup();
startup.ConfigureServices(services);
IServiceCollection services = new ServiceCollection();

var serviceProvider = services.BuildServiceProvider();
var startup = new Startup();
startup.ConfigureServices(services);

var actionService = serviceProvider.GetService<ActionService>();
actionService.Consume();
var serviceProvider = services.BuildServiceProvider();

for (int i = 0; i < 50; i++)
{
actionService.SendNewSample();
}
var actionService = serviceProvider.GetService<ActionService>();
await actionService.ConsumeAsync();

Console.ReadLine();
for (int i = 0; i < 50; i++)
{
await actionService.SendNewSampleAsync();
}

Console.ReadLine();
}
}
}
Loading
Loading